From 0a2fe11a09fc88e0f41baab557249841158e82a6 Mon Sep 17 00:00:00 2001 From: David Justice Date: Wed, 9 Jan 2019 15:58:52 -0800 Subject: [PATCH 01/17] add abstraction for opentracing and opencensus --- go.mod | 1 + go.sum | 2 + opencensus/opencensus.go | 94 +++++++++++++++++++++++++++++++ opentracing/opentracing.go | 80 ++++++++++++++++++++++++++ trace/trace.go | 111 +++++++++++++++++++++++++++++++++++++ 5 files changed, 288 insertions(+) create mode 100644 opencensus/opencensus.go create mode 100644 opentracing/opentracing.go create mode 100644 trace/trace.go diff --git a/go.mod b/go.mod index 41a58bf..04f7fc3 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/fortytw2/leaktest v1.2.0 // indirect github.com/google/go-cmp v0.2.0 // indirect + github.com/opentracing/opentracing-go v1.0.2 github.com/pkg/errors v0.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.2.2 diff --git a/go.sum b/go.sum index 0baaf13..50b5cc6 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/fortytw2/leaktest v1.2.0 h1:cj6GCiwJDH7l3tMHLjZDo0QqPtrXJiWSI9JgpeQKw github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= +github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/opencensus/opencensus.go b/opencensus/opencensus.go new file mode 100644 index 0000000..5c2fe83 --- /dev/null +++ b/opencensus/opencensus.go @@ -0,0 +1,94 @@ +package opencensus + +import ( + "context" + "github.com/Azure/azure-amqp-common-go/trace" + oct "go.opencensus.io/trace" +) + +func init() { + trace.Register(new(Trace)) +} + +type ( + // Trace is the implementation of the OpenCensus trace abstraction + Trace struct{} + + // Span is the implementation of the OpenCensus Span abstraction + Span struct { + span *oct.Span + } +) + +// StartSpan starts a new child span of the current span in the context. If +// there is no span in the context, creates a new trace and span. +// +// Returned context contains the newly created span. You can use it to +// propagate the returned span in process. +func (t *Trace) StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, trace.Spanner) { + ctx, span := oct.StartSpan(ctx, operationName, toOCOption(opts...)...) + return ctx, &Span{span: span} +} + +// StartSpanWithRemoteParent starts a new child span of the span from the given parent. +// +// If the incoming context contains a parent, it ignores. StartSpanWithRemoteParent is +// preferred for cases where the parent is propagated via an incoming request. +// +// Returned context contains the newly created span. You can use it to +// propagate the returned span in process. +func (t *Trace) StartSpanWithRemoteParent(ctx context.Context, operationName string, reference interface{}, opts ...interface{}) (context.Context, trace.Spanner) { + if sp, ok := reference.(oct.SpanContext); ok { + ctx, span := oct.StartSpanWithRemoteParent(ctx, operationName, sp, toOCOption(opts...)...) + return ctx, &Span{span: span} + } + return t.StartSpan(ctx, operationName) +} + +// FromContext returns the Span stored in a context, or nil if there isn't one. +func (t *Trace) FromContext(ctx context.Context) trace.Spanner { + sp := oct.FromContext(ctx) + return &Span{span: sp} +} + +// AddAttributes sets attributes in the span. +// +// Existing attributes whose keys appear in the attributes parameter are overwritten. +func (s *Span) AddAttributes(attributes ...trace.Attribute) { + s.span.AddAttributes(attributesToOCAttributes(attributes...)...) +} + +// End ends the span. +func (s *Span) End() { + s.span.End() +} + +// Logger returns a trace.Logger for the span +func (s *Span) Logger() trace.Logger { + return &trace.SpanLogger{Span: s} +} + +func toOCOption(opts ...interface{}) []oct.StartOption { + var ocStartOptions []oct.StartOption + for _, opt := range opts { + if o, ok := opt.(oct.StartOption); ok { + ocStartOptions = append(ocStartOptions, o) + } + } + return ocStartOptions +} + +func attributesToOCAttributes(attributes ...trace.Attribute) []oct.Attribute { + var ocAttributes []oct.Attribute + for _, attr := range attributes { + switch attr.Value.(type) { + case int64: + ocAttributes = append(ocAttributes, oct.Int64Attribute(attr.Key, attr.Value.(int64))) + case string: + ocAttributes = append(ocAttributes, oct.StringAttribute(attr.Key, attr.Value.(string))) + case bool: + ocAttributes = append(ocAttributes, oct.BoolAttribute(attr.Key, attr.Value.(bool))) + } + } + return ocAttributes +} diff --git a/opentracing/opentracing.go b/opentracing/opentracing.go new file mode 100644 index 0000000..b05ba62 --- /dev/null +++ b/opentracing/opentracing.go @@ -0,0 +1,80 @@ +package opentracing + +import ( + "context" + "github.com/Azure/azure-amqp-common-go/trace" + "github.com/opentracing/opentracing-go" +) + +func init() { + trace.Register(new(Trace)) +} + +type ( + // Trace is the implementation of the OpenTracing trace abstraction + Trace struct{} + + // Span is the implementation of the OpenTracing Span abstraction + Span struct { + span opentracing.Span + } +) + +// StartSpan starts and returns a Span with `operationName`, using +// any Span found within `ctx` as a ChildOfRef. If no such parent could be +// found, StartSpanFromContext creates a root (parentless) Span. +func (t *Trace) StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, trace.Spanner) { + span, ctx := opentracing.StartSpanFromContext(ctx, operationName, toOTOption(opts...)...) + return ctx, &Span{span: span} +} + +// StartSpanWithRemoteParent starts and returns a Span with `operationName`, using +// reference span as FollowsFrom +func (t *Trace) StartSpanWithRemoteParent(ctx context.Context, operationName string, reference interface{}, opts ...interface{}) (context.Context, trace.Spanner) { + if sp, ok := reference.(opentracing.SpanContext); ok { + span := opentracing.StartSpan(operationName, append(toOTOption(opts...), opentracing.FollowsFrom(sp))...) + ctx = opentracing.ContextWithSpan(ctx, span) + return ctx, &Span{span: span} + } + return t.StartSpan(ctx, operationName) +} + +// FromContext returns the `Span` previously associated with `ctx`, or +// `nil` if no such `Span` could be found. +func (t *Trace) FromContext(ctx context.Context) trace.Spanner { + sp := opentracing.SpanFromContext(ctx) + return &Span{span: sp} +} + +// AddAttributes a tags to the span. +// +// If there is a pre-existing tag set for `key`, it is overwritten. +func (s *Span) AddAttributes(attributes ...trace.Attribute) { + for _, attr := range attributes { + s.span.SetTag(attr.Key, attr.Value) + } +} + +// End sets the end timestamp and finalizes Span state. +// +// With the exception of calls to Context() (which are always allowed), +// Finish() must be the last call made to any span instance, and to do +// otherwise leads to undefined behavior. +func (s *Span) End() { + s.span.Finish() +} + +// Logger returns a trace.Logger for the span +func (s *Span) Logger() trace.Logger { + return &trace.SpanLogger{Span: s} +} + +func toOTOption(opts ...interface{}) []opentracing.StartSpanOption { + var ocStartOptions []opentracing.StartSpanOption + for _, opt := range opts { + if o, ok := opt.(opentracing.StartSpanOption); ok { + ocStartOptions = append(ocStartOptions, o) + } + } + return ocStartOptions +} diff --git a/trace/trace.go b/trace/trace.go new file mode 100644 index 0000000..82c1a2e --- /dev/null +++ b/trace/trace.go @@ -0,0 +1,111 @@ +package trace + +import ( + "context" +) + +var tracer Tracer + +// Register a Tracer instance +func Register(t Tracer) { + tracer = t +} + +// BoolAttribute returns a bool-valued attribute. +func BoolAttribute(key string, value bool) Attribute { + return Attribute{Key: key, Value: value} +} + +// StringAttribute returns a string-valued attribute. +func StringAttribute(key, value string) Attribute { + return Attribute{Key: key, Value: value} +} + +// Int64Attribute returns an int64-valued attribute. +func Int64Attribute(key string, value int64) Attribute { + return Attribute{Key: key, Value: value} +} + +type ( + // Attribute is a key value pair for decorating spans + Attribute struct { + Key string + Value interface{} + } + + // Spanner is an abstraction over OpenTracing and OpenCensus Spans + Spanner interface { + AddAttributes(attributes ...Attribute) + End() + Logger() Logger + } + + // Tracer is an abstraction over OpenTracing and OpenCensus trace implementations + Tracer interface { + StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, Spanner) + StartSpanWithRemoteParent(ctx context.Context, operationName string, reference interface{}, opts ...interface{}) (context.Context, Spanner) + FromContext(ctx context.Context) Spanner + } + + // Logger is a generic interface for logging + Logger interface { + Info(msg string, attributes ...Attribute) + Error(err error, attributes ...Attribute) + Fatal(msg string, attributes ...Attribute) + Debug(msg string, attributes ...Attribute) + } + + // SpanLogger is a Logger implementation which logs to a tracing span + SpanLogger struct { + Span Spanner + } + + nopLogger struct{} +) + +// For will return a logger for a given context +func For(ctx context.Context) Logger { + if span := tracer.FromContext(ctx); span != nil { + return span.Logger() + } + return new(nopLogger) +} + +// Info logs an info tag with message to a span +func (sl SpanLogger) Info(msg string, attributes ...Attribute) { + sl.logToSpan("info", msg, attributes...) +} + +// Error logs an error tag with message to a span +func (sl SpanLogger) Error(err error, attributes ...Attribute) { + attributes = append(attributes, BoolAttribute("error", true)) + sl.logToSpan("error", err.Error(), attributes...) +} + +// Fatal logs an error tag with message to a span +func (sl SpanLogger) Fatal(msg string, attributes ...Attribute) { + attributes = append(attributes, BoolAttribute("error", true)) + sl.logToSpan("fatal", msg, attributes...) +} + +// Debug logs a debug tag with message to a span +func (sl SpanLogger) Debug(msg string, attributes ...Attribute) { + sl.logToSpan("debug", msg, attributes...) +} + +func (sl SpanLogger) logToSpan(level string, msg string, attributes ...Attribute) { + attrs := append(attributes, StringAttribute("event", msg), StringAttribute("level", level)) + sl.Span.AddAttributes(attrs...) +} + +// Info nops log entry +func (sl nopLogger) Info(msg string, attributes ...Attribute) {} + +// Error nops log entry +func (sl nopLogger) Error(err error, attributes ...Attribute) {} + +// Fatal nops log entry +func (sl nopLogger) Fatal(msg string, attributes ...Attribute) {} + +// Debug nops log entry +func (sl nopLogger) Debug(msg string, attributes ...Attribute) {} From 5d0b91d07d7aea7b04292d4752c466d96a33269f Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 10 Jan 2019 14:29:57 -0800 Subject: [PATCH 02/17] update ot and oc --- go.mod | 3 +-- go.sum | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 04f7fc3..5c9ac6e 100644 --- a/go.mod +++ b/go.mod @@ -6,12 +6,11 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/fortytw2/leaktest v1.2.0 // indirect - github.com/google/go-cmp v0.2.0 // indirect github.com/opentracing/opentracing-go v1.0.2 github.com/pkg/errors v0.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.2.2 - go.opencensus.io v0.15.0 + go.opencensus.io v0.18.0 golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 // indirect pack.ag/amqp v0.8.0 diff --git a/go.sum b/go.sum index 50b5cc6..947f403 100644 --- a/go.sum +++ b/go.sum @@ -1,28 +1,51 @@ +git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/Azure/azure-sdk-for-go v21.3.0+incompatible h1:YFvAka2WKAl2xnJkYV1e1b7E2z88AgFszDzWU18ejMY= github.com/Azure/azure-sdk-for-go v21.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-autorest v11.0.0+incompatible h1:eiSNFBmiWLzcjYsTkq8XZ0KRlvirsX0eyFQ8ZyCTgiQ= github.com/Azure/go-autorest v11.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/fortytw2/leaktest v1.2.0 h1:cj6GCiwJDH7l3tMHLjZDo0QqPtrXJiWSI9JgpeQKw+Q= github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= go.opencensus.io v0.15.0 h1:r1SzcjSm4ybA0qZs3B4QYX072f8gK61Kh0qtwyFpfdk= go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0= +go.opencensus.io v0.18.0 h1:Mk5rgZcggtbvtAun5aJzAtjKKN/t0R3jJPlWILlv938= +go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 h1:Vk3wNqEZwyGyei9yq5ekj7frek2u7HUfffJ1/opblzc= golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= +google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= pack.ag/amqp v0.8.0 h1:JT0f88Hsbo5D+s8bBdleDOHvMDoYcaBW6GplAUqtxC4= pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= From c2c42c8bb320106a7c8805c516d3988c16a21a87 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 10 Jan 2019 14:49:28 -0800 Subject: [PATCH 03/17] add top level span methods --- opencensus/opencensus.go | 2 +- trace/trace.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/opencensus/opencensus.go b/opencensus/opencensus.go index 5c2fe83..2cbda75 100644 --- a/opencensus/opencensus.go +++ b/opencensus/opencensus.go @@ -58,7 +58,7 @@ func (s *Span) AddAttributes(attributes ...trace.Attribute) { s.span.AddAttributes(attributesToOCAttributes(attributes...)...) } -// End ends the span. +// End ends the span func (s *Span) End() { s.span.End() } diff --git a/trace/trace.go b/trace/trace.go index 82c1a2e..0ee1731 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -26,6 +26,30 @@ func Int64Attribute(key string, value int64) Attribute { return Attribute{Key: key, Value: value} } +// StartSpan starts a new child span +func StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, Spanner) { + if tracer == nil { + return ctx, new(nopSpanner) + } + return tracer.StartSpan(ctx, operationName, opts) +} + +// StartSpanWithRemoteParent starts a new child span of the span from the given parent. +func StartSpanWithRemoteParent(ctx context.Context, operationName string, reference interface{}, opts ...interface{}) (context.Context, Spanner) { + if tracer == nil { + return ctx, new(nopSpanner) + } + return tracer.StartSpanWithRemoteParent(ctx, operationName, reference, opts) +} + +// FromContext returns the Span stored in a context, or nil if there isn't one. +func FromContext(ctx context.Context) Spanner { + if tracer == nil { + return new(nopSpanner) + } + return tracer.FromContext(ctx) +} + type ( // Attribute is a key value pair for decorating spans Attribute struct { @@ -61,8 +85,21 @@ type ( } nopLogger struct{} + + nopSpanner struct{} ) +// AddAttributes is a nop +func (ns *nopSpanner) AddAttributes(attributes ...Attribute) {} + +// End is a nop +func (ns *nopSpanner) End() {} + +// Logger returns a nopLogger +func (ns *nopSpanner) Logger() Logger { + return nopLogger{} +} + // For will return a logger for a given context func For(ctx context.Context) Logger { if span := tracer.FromContext(ctx); span != nil { From 27a7af64ed7fa9e1fd8a68e4e6e77f3b5d6694c1 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 10 Jan 2019 16:05:12 -0800 Subject: [PATCH 04/17] add trace propagation --- opencensus/opencensus.go | 23 +++++++++++++++---- opentracing/opentracing.go | 40 ++++++++++++++++++++++++++++----- trace/trace.go | 46 ++++++++++++++++++++++++-------------- 3 files changed, 82 insertions(+), 27 deletions(-) diff --git a/opencensus/opencensus.go b/opencensus/opencensus.go index 2cbda75..d8659a8 100644 --- a/opencensus/opencensus.go +++ b/opencensus/opencensus.go @@ -4,12 +4,17 @@ import ( "context" "github.com/Azure/azure-amqp-common-go/trace" oct "go.opencensus.io/trace" + "go.opencensus.io/trace/propagation" ) func init() { trace.Register(new(Trace)) } +const ( + propagationKey = "_oc_prop" +) + type ( // Trace is the implementation of the OpenCensus trace abstraction Trace struct{} @@ -37,11 +42,15 @@ func (t *Trace) StartSpan(ctx context.Context, operationName string, opts ...int // // Returned context contains the newly created span. You can use it to // propagate the returned span in process. -func (t *Trace) StartSpanWithRemoteParent(ctx context.Context, operationName string, reference interface{}, opts ...interface{}) (context.Context, trace.Spanner) { - if sp, ok := reference.(oct.SpanContext); ok { - ctx, span := oct.StartSpanWithRemoteParent(ctx, operationName, sp, toOCOption(opts...)...) - return ctx, &Span{span: span} +func (t *Trace) StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier trace.Carrier, opts ...interface{}) (context.Context, trace.Spanner) { + keysValues := carrier.GetKeyValues() + if val, ok := keysValues[propagationKey]; ok { + if sc, ok := propagation.FromBinary(val.([]byte)); ok { + ctx, span := oct.StartSpanWithRemoteParent(ctx, operationName, sc) + return ctx, &Span{span: span} + } } + return t.StartSpan(ctx, operationName) } @@ -68,6 +77,12 @@ func (s *Span) Logger() trace.Logger { return &trace.SpanLogger{Span: s} } +// Inject propagation key onto the carrier +func (s *Span) Inject(carrier trace.Carrier) error { + carrier.Set(propagationKey, propagation.Binary(s.span.SpanContext())) + return nil +} + func toOCOption(opts ...interface{}) []oct.StartOption { var ocStartOptions []oct.StartOption for _, opt := range opts { diff --git a/opentracing/opentracing.go b/opentracing/opentracing.go index b05ba62..5fd2e6b 100644 --- a/opentracing/opentracing.go +++ b/opentracing/opentracing.go @@ -18,6 +18,10 @@ type ( Span struct { span opentracing.Span } + + carrierAdapter struct { + carrier trace.Carrier + } ) // StartSpan starts and returns a Span with `operationName`, using @@ -30,13 +34,15 @@ func (t *Trace) StartSpan(ctx context.Context, operationName string, opts ...int // StartSpanWithRemoteParent starts and returns a Span with `operationName`, using // reference span as FollowsFrom -func (t *Trace) StartSpanWithRemoteParent(ctx context.Context, operationName string, reference interface{}, opts ...interface{}) (context.Context, trace.Spanner) { - if sp, ok := reference.(opentracing.SpanContext); ok { - span := opentracing.StartSpan(operationName, append(toOTOption(opts...), opentracing.FollowsFrom(sp))...) - ctx = opentracing.ContextWithSpan(ctx, span) - return ctx, &Span{span: span} +func (t *Trace) StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier trace.Carrier, opts ...interface{}) (context.Context, trace.Spanner) { + sc, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrierAdapter{carrier: carrier}) + if err != nil { + return t.StartSpan(ctx, operationName) } - return t.StartSpan(ctx, operationName) + + span := opentracing.StartSpan(operationName, append(toOTOption(opts...), opentracing.FollowsFrom(sc))...) + ctx = opentracing.ContextWithSpan(ctx, span) + return ctx, &Span{span: span} } // FromContext returns the `Span` previously associated with `ctx`, or @@ -69,6 +75,28 @@ func (s *Span) Logger() trace.Logger { return &trace.SpanLogger{Span: s} } +// Inject span context into carrier +func (s *Span) Inject(carrier trace.Carrier) error { + return opentracing.GlobalTracer().Inject(s.span.Context(), opentracing.TextMap, carrierAdapter{carrier: carrier}) +} + +// Set a key and value on the carrier +func (ca *carrierAdapter) Set(key, value string) { + ca.carrier.Set(key, value) +} + +// ForeachKey runs the handler across the map of carrier key / values +func (ca *carrierAdapter) ForeachKey(handler func(key, val string) error) error { + for k, v := range ca.carrier.GetKeyValues() { + if vStr, ok := v.(string); ok { + if err := handler(k, vStr); err != nil { + return err + } + } + } + return nil +} + func toOTOption(opts ...interface{}) []opentracing.StartSpanOption { var ocStartOptions []opentracing.StartSpanOption for _, opt := range opts { diff --git a/trace/trace.go b/trace/trace.go index 0ee1731..6161e68 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -29,23 +29,23 @@ func Int64Attribute(key string, value int64) Attribute { // StartSpan starts a new child span func StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, Spanner) { if tracer == nil { - return ctx, new(nopSpanner) + return ctx, new(noOpSpanner) } return tracer.StartSpan(ctx, operationName, opts) } // StartSpanWithRemoteParent starts a new child span of the span from the given parent. -func StartSpanWithRemoteParent(ctx context.Context, operationName string, reference interface{}, opts ...interface{}) (context.Context, Spanner) { +func StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier Carrier, opts ...interface{}) (context.Context, Spanner) { if tracer == nil { - return ctx, new(nopSpanner) + return ctx, new(noOpSpanner) } - return tracer.StartSpanWithRemoteParent(ctx, operationName, reference, opts) + return tracer.StartSpanWithRemoteParent(ctx, operationName, carrier, opts) } // FromContext returns the Span stored in a context, or nil if there isn't one. func FromContext(ctx context.Context) Spanner { if tracer == nil { - return new(nopSpanner) + return new(noOpSpanner) } return tracer.FromContext(ctx) } @@ -57,17 +57,24 @@ type ( Value interface{} } + // Carrier is an abstraction over OpenTracing and OpenCensus propagation carrier + Carrier interface { + Set(key string, value interface{}) + GetKeyValues() map[string]interface{} + } + // Spanner is an abstraction over OpenTracing and OpenCensus Spans Spanner interface { AddAttributes(attributes ...Attribute) End() Logger() Logger + Inject(carrier Carrier) error } // Tracer is an abstraction over OpenTracing and OpenCensus trace implementations Tracer interface { StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, Spanner) - StartSpanWithRemoteParent(ctx context.Context, operationName string, reference interface{}, opts ...interface{}) (context.Context, Spanner) + StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier Carrier, opts ...interface{}) (context.Context, Spanner) FromContext(ctx context.Context) Spanner } @@ -84,20 +91,25 @@ type ( Span Spanner } - nopLogger struct{} + noOpLogger struct{} - nopSpanner struct{} + noOpSpanner struct{} ) // AddAttributes is a nop -func (ns *nopSpanner) AddAttributes(attributes ...Attribute) {} +func (ns *noOpSpanner) AddAttributes(attributes ...Attribute) {} // End is a nop -func (ns *nopSpanner) End() {} +func (ns *noOpSpanner) End() {} // Logger returns a nopLogger -func (ns *nopSpanner) Logger() Logger { - return nopLogger{} +func (ns *noOpSpanner) Logger() Logger { + return noOpLogger{} +} + +// Inject is a nop +func (ns *noOpSpanner) Inject(carrier Carrier) error { + return nil } // For will return a logger for a given context @@ -105,7 +117,7 @@ func For(ctx context.Context) Logger { if span := tracer.FromContext(ctx); span != nil { return span.Logger() } - return new(nopLogger) + return new(noOpLogger) } // Info logs an info tag with message to a span @@ -136,13 +148,13 @@ func (sl SpanLogger) logToSpan(level string, msg string, attributes ...Attribute } // Info nops log entry -func (sl nopLogger) Info(msg string, attributes ...Attribute) {} +func (sl noOpLogger) Info(msg string, attributes ...Attribute) {} // Error nops log entry -func (sl nopLogger) Error(err error, attributes ...Attribute) {} +func (sl noOpLogger) Error(err error, attributes ...Attribute) {} // Fatal nops log entry -func (sl nopLogger) Fatal(msg string, attributes ...Attribute) {} +func (sl noOpLogger) Fatal(msg string, attributes ...Attribute) {} // Debug nops log entry -func (sl nopLogger) Debug(msg string, attributes ...Attribute) {} +func (sl noOpLogger) Debug(msg string, attributes ...Attribute) {} From bca7481a738b7eb5a02a16bd7bb3813a6e24928f Mon Sep 17 00:00:00 2001 From: David Justice Date: Fri, 11 Jan 2019 11:06:03 -0800 Subject: [PATCH 05/17] add NewContext --- opencensus/opencensus.go | 13 +++++++++++++ opentracing/opentracing.go | 13 +++++++++++++ trace/trace.go | 7 +++++++ 3 files changed, 33 insertions(+) diff --git a/opencensus/opencensus.go b/opencensus/opencensus.go index d8659a8..6e5aed2 100644 --- a/opencensus/opencensus.go +++ b/opencensus/opencensus.go @@ -60,6 +60,14 @@ func (t *Trace) FromContext(ctx context.Context) trace.Spanner { return &Span{span: sp} } +// NewContext returns a new context with the given Span attached. +func (t *Trace) NewContext(ctx context.Context, span trace.Spanner) context.Context { + if sp, ok := span.InternalSpan().(*oct.Span); ok { + return oct.NewContext(ctx, sp) + } + return ctx +} + // AddAttributes sets attributes in the span. // // Existing attributes whose keys appear in the attributes parameter are overwritten. @@ -83,6 +91,11 @@ func (s *Span) Inject(carrier trace.Carrier) error { return nil } +// InternalSpan returns the real implementation of the Span +func (s *Span) InternalSpan() interface{} { + return s.span +} + func toOCOption(opts ...interface{}) []oct.StartOption { var ocStartOptions []oct.StartOption for _, opt := range opts { diff --git a/opentracing/opentracing.go b/opentracing/opentracing.go index 5fd2e6b..ac1c627 100644 --- a/opentracing/opentracing.go +++ b/opentracing/opentracing.go @@ -52,6 +52,14 @@ func (t *Trace) FromContext(ctx context.Context) trace.Spanner { return &Span{span: sp} } +// NewContext returns a new context with the given Span attached. +func (t *Trace) NewContext(ctx context.Context, span trace.Spanner) context.Context { + if sp, ok := span.InternalSpan().(opentracing.Span); ok { + return opentracing.ContextWithSpan(ctx, sp) + } + return ctx +} + // AddAttributes a tags to the span. // // If there is a pre-existing tag set for `key`, it is overwritten. @@ -80,6 +88,11 @@ func (s *Span) Inject(carrier trace.Carrier) error { return opentracing.GlobalTracer().Inject(s.span.Context(), opentracing.TextMap, carrierAdapter{carrier: carrier}) } +// InternalSpan returns the real implementation of the Span +func (s *Span) InternalSpan() interface{} { + return s.span +} + // Set a key and value on the carrier func (ca *carrierAdapter) Set(key, value string) { ca.carrier.Set(key, value) diff --git a/trace/trace.go b/trace/trace.go index 6161e68..fc7bfbc 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -69,6 +69,7 @@ type ( End() Logger() Logger Inject(carrier Carrier) error + InternalSpan() interface{} } // Tracer is an abstraction over OpenTracing and OpenCensus trace implementations @@ -76,6 +77,7 @@ type ( StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, Spanner) StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier Carrier, opts ...interface{}) (context.Context, Spanner) FromContext(ctx context.Context) Spanner + NewContext(parent context.Context, span Spanner) context.Context } // Logger is a generic interface for logging @@ -112,6 +114,11 @@ func (ns *noOpSpanner) Inject(carrier Carrier) error { return nil } +// InternalSpan returns nil +func (ns *noOpSpanner) InternalSpan() interface{} { + return nil +} + // For will return a logger for a given context func For(ctx context.Context) Logger { if span := tracer.FromContext(ctx); span != nil { From 2fc92250d022ea7cf3f5d4c5af999014e6bbd083 Mon Sep 17 00:00:00 2001 From: David Justice Date: Fri, 11 Jan 2019 11:08:39 -0800 Subject: [PATCH 06/17] add top level func --- trace/trace.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/trace/trace.go b/trace/trace.go index fc7bfbc..ffeeaa5 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -50,6 +50,14 @@ func FromContext(ctx context.Context) Spanner { return tracer.FromContext(ctx) } +// NewContext returns a new context with the given Span attached. +func NewContext(ctx context.Context, span Spanner) context.Context { + if tracer == nil { + return ctx + } + return tracer.NewContext(ctx, span) +} + type ( // Attribute is a key value pair for decorating spans Attribute struct { From 61041998010618e3dd399f359abc54e4c78ca3df Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 23 May 2019 15:12:45 -0700 Subject: [PATCH 07/17] use tab for logging and tracing --- Makefile | 12 +-- cbs/cbs.go | 13 +-- go.mod | 13 ++- go.sum | 105 +++++++++++++++------- internal/tracing/tracing.go | 19 ++-- log/logger.go | 61 ------------- opencensus/opencensus.go | 122 ------------------------- opentracing/opentracing.go | 121 ------------------------- rpc/rpc.go | 43 +++++---- trace/trace.go | 175 ------------------------------------ 10 files changed, 126 insertions(+), 558 deletions(-) delete mode 100644 log/logger.go delete mode 100644 opencensus/opencensus.go delete mode 100644 opentracing/opentracing.go delete mode 100644 trace/trace.go diff --git a/Makefile b/Makefile index b77ec82..371bf35 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ M = $(shell printf "\033[34;1m▶\033[0m") TIMEOUT = 360 .PHONY: all -all: fmt go.sum lint vet megacheck | $(BASE) ; $(info $(M) building library…) @ ## Build program +all: fmt go.sum lint vet tidy | $(BASE) ; $(info $(M) building library…) @ ## Build program $Q cd $(BASE) && $(GO) build \ -tags release \ -ldflags '-X $(PACKAGE)/cmd.Version=$(VERSION) -X $(PACKAGE)/cmd.BuildDate=$(DATE)' \ @@ -35,6 +35,10 @@ GOLINT = $(BIN)/golint $(BIN)/golint: | $(BASE) ; $(info $(M) building golint…) $Q go get -u golang.org/x/lint/golint +.PHONY: tidy +tidy: ; $(info $(M) running tidy…) @ ## Run tidy + $Q $(GO) mod tidy + # Tests TEST_TARGETS := test-default test-bench test-short test-verbose test-race test-debug @@ -47,7 +51,7 @@ test-race: ARGS=-race ## Run tests with race detector test-cover: ARGS=-cover ## Run tests in verbose mode with coverage $(TEST_TARGETS): NAME=$(MAKECMDGOALS:test-%=%) $(TEST_TARGETS): test -check test tests: cyclo lint vet go.sum megacheck | $(BASE) ; $(info $(M) running $(NAME:%=% )tests…) @ ## Run tests +check test tests: cyclo lint vet go.sum | $(BASE) ; $(info $(M) running $(NAME:%=% )tests…) @ ## Run tests $Q cd $(BASE) && $(GO) test -timeout $(TIMEOUT)s $(ARGS) $(TESTPKGS) .PHONY: vet @@ -60,10 +64,6 @@ lint: go.sum | $(BASE) $(GOLINT) ; $(info $(M) running golint…) @ ## Run golin test -z "$$($(GOLINT) $$pkg | tee /dev/stderr)" || ret=1 ; \ done ; exit $$ret -.PHONY: megacheck -megacheck: go.sum | $(BASE) ; $(info $(M) running megacheck…) @ ## Run megacheck - $Q cd $(BASE) && megacheck - .PHONY: fmt fmt: ; $(info $(M) running gofmt…) @ ## Run gofmt on all source files @ret=0 && for d in $$($(GO) list -f '{{.Dir}}' ./...); do \ diff --git a/cbs/cbs.go b/cbs/cbs.go index 7ccb403..b1a6c6a 100644 --- a/cbs/cbs.go +++ b/cbs/cbs.go @@ -29,11 +29,12 @@ import ( "fmt" "time" + "github.com/devigned/tab" + "pack.ag/amqp" + "github.com/Azure/azure-amqp-common-go/auth" "github.com/Azure/azure-amqp-common-go/internal/tracing" - "github.com/Azure/azure-amqp-common-go/log" "github.com/Azure/azure-amqp-common-go/rpc" - "pack.ag/amqp" ) const ( @@ -47,7 +48,7 @@ const ( // NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, provider auth.TokenProvider) error { - span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.cbs.NegotiateClaim") + ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.cbs.NegotiateClaim") defer span.End() link, err := rpc.NewLink(conn, cbsAddress) @@ -61,7 +62,7 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro return err } - log.For(ctx).Debug(fmt.Sprintf("negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry)) + tab.For(ctx).Debug(fmt.Sprintf("negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry)) msg := &amqp.Message{ Value: token.Token, ApplicationProperties: map[string]interface{}{ @@ -74,10 +75,10 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro res, err := link.RetryableRPC(ctx, 3, 1*time.Second, msg) if err != nil { - log.For(ctx).Error(err) + tab.For(ctx).Error(err) return err } - log.For(ctx).Debug(fmt.Sprintf("negotiated with response code %d and message: %s", res.Code, res.Description)) + tab.For(ctx).Debug(fmt.Sprintf("negotiated with response code %d and message: %s", res.Code, res.Description)) return nil } diff --git a/go.mod b/go.mod index 5c9ac6e..fd35734 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,16 @@ module github.com/Azure/azure-amqp-common-go require ( - github.com/Azure/azure-sdk-for-go v21.3.0+incompatible // indirect - github.com/Azure/go-autorest v11.0.0+incompatible + contrib.go.opencensus.io/exporter/ocagent v0.5.0 // indirect + github.com/Azure/azure-sdk-for-go v29.0.0+incompatible // indirect + github.com/Azure/go-autorest v12.0.0+incompatible github.com/davecgh/go-spew v1.1.1 // indirect + github.com/devigned/tab v0.1.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/fortytw2/leaktest v1.2.0 // indirect - github.com/opentracing/opentracing-go v1.0.2 github.com/pkg/errors v0.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.2.2 - go.opencensus.io v0.18.0 - golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 - golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 // indirect - pack.ag/amqp v0.8.0 + golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 + pack.ag/amqp v0.11.0 ) diff --git a/go.sum b/go.sum index 947f403..0a473be 100644 --- a/go.sum +++ b/go.sum @@ -1,51 +1,92 @@ -git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= -github.com/Azure/azure-sdk-for-go v21.3.0+incompatible h1:YFvAka2WKAl2xnJkYV1e1b7E2z88AgFszDzWU18ejMY= -github.com/Azure/azure-sdk-for-go v21.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/go-autorest v11.0.0+incompatible h1:eiSNFBmiWLzcjYsTkq8XZ0KRlvirsX0eyFQ8ZyCTgiQ= -github.com/Azure/go-autorest v11.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= -github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +contrib.go.opencensus.io/exporter/ocagent v0.5.0 h1:TKXjQSRS0/cCDrP7KvkgU6SmILtF/yV2TOs/02K/WZQ= +contrib.go.opencensus.io/exporter/ocagent v0.5.0/go.mod h1:ImxhfLRpxoYiSq891pBrLVhN+qmP8BTVvdH2YLs7Gl0= +github.com/Azure/azure-sdk-for-go v29.0.0+incompatible h1:CYPU39ULbGjQBo3gXIqiWouK0C4F+Pt2Zx5CqGvqknE= +github.com/Azure/azure-sdk-for-go v29.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/go-autorest v12.0.0+incompatible h1:N+VqClcomLGD/sHb3smbSYYtNMgKpVV3Cd5r5i8z6bQ= +github.com/Azure/go-autorest v12.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4= +github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/devigned/tab v0.1.0 h1:d2HOIeMxOKGiF5mUpu2WMANekUJO4u44BQl0hkpeKB4= +github.com/devigned/tab v0.1.0/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/fortytw2/leaktest v1.2.0 h1:cj6GCiwJDH7l3tMHLjZDo0QqPtrXJiWSI9JgpeQKw+Q= github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= -github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= -github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= +github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE= +github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= -github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -go.opencensus.io v0.15.0 h1:r1SzcjSm4ybA0qZs3B4QYX072f8gK61Kh0qtwyFpfdk= -go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0= -go.opencensus.io v0.18.0 h1:Mk5rgZcggtbvtAun5aJzAtjKKN/t0R3jJPlWILlv938= -go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= -golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 h1:Vk3wNqEZwyGyei9yq5ekj7frek2u7HUfffJ1/opblzc= -golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM= -golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg= +go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= -google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -pack.ag/amqp v0.8.0 h1:JT0f88Hsbo5D+s8bBdleDOHvMDoYcaBW6GplAUqtxC4= -pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/api v0.4.0 h1:KKgc1aqhV8wDPbDzlDtpvyjZFY3vjz85FP7p4wcQUyI= +google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19 h1:Lj2SnHtxkRGJDqnGaSjo+CCdIieEnwVazbOXILwQemk= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +pack.ag/amqp v0.11.0 h1:ot/IA0enDkt4/c8xfbCO7AZzjM4bHys/UffnFmnHUnU= +pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= diff --git a/internal/tracing/tracing.go b/internal/tracing/tracing.go index 239ba39..8a03355 100644 --- a/internal/tracing/tracing.go +++ b/internal/tracing/tracing.go @@ -4,28 +4,29 @@ import ( "context" "os" + "github.com/devigned/tab" + "github.com/Azure/azure-amqp-common-go/internal" - "go.opencensus.io/trace" ) // StartSpanFromContext starts a span given a context and applies common library information -func StartSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) { - ctx, span := trace.StartSpan(ctx, operationName, opts...) +func StartSpanFromContext(ctx context.Context, operationName string) (context.Context, tab.Spanner) { + ctx, span := tab.StartSpan(ctx, operationName) ApplyComponentInfo(span) - return span, ctx + return ctx, span } // ApplyComponentInfo applies eventhub library and network info to the span -func ApplyComponentInfo(span *trace.Span) { +func ApplyComponentInfo(span tab.Spanner) { span.AddAttributes( - trace.StringAttribute("component", "github.com/Azure/azure-amqp-common-go"), - trace.StringAttribute("version", common.Version)) + tab.StringAttribute("component", "github.com/Azure/azure-amqp-common-go"), + tab.StringAttribute("version", common.Version)) applyNetworkInfo(span) } -func applyNetworkInfo(span *trace.Span) { +func applyNetworkInfo(span tab.Spanner) { hostname, err := os.Hostname() if err == nil { - span.AddAttributes(trace.StringAttribute("peer.hostname", hostname)) + span.AddAttributes(tab.StringAttribute("peer.hostname", hostname)) } } diff --git a/log/logger.go b/log/logger.go deleted file mode 100644 index 7bdcf6d..0000000 --- a/log/logger.go +++ /dev/null @@ -1,61 +0,0 @@ -package log - -import ( - "context" - - "go.opencensus.io/trace" -) - -type ( - // Logger is the interface for opentracing logging - Logger interface { - Info(msg string, attributes ...trace.Attribute) - Error(err error, attributes ...trace.Attribute) - Fatal(msg string, attributes ...trace.Attribute) - Debug(msg string, attributes ...trace.Attribute) - } - - spanLogger struct { - span *trace.Span - } - - nopLogger struct{} -) - -// For will return a logger for a given context -func For(ctx context.Context) Logger { - if span := trace.FromContext(ctx); span != nil { - return &spanLogger{ - span: span, - } - } - return new(nopLogger) -} - -func (sl spanLogger) Info(msg string, attributes ...trace.Attribute) { - sl.logToSpan("info", msg, attributes...) -} - -func (sl spanLogger) Error(err error, attributes ...trace.Attribute) { - attributes = append(attributes, trace.BoolAttribute("error", true)) - sl.logToSpan("error", err.Error(), attributes...) -} - -func (sl spanLogger) Fatal(msg string, attributes ...trace.Attribute) { - attributes = append(attributes, trace.BoolAttribute("error", true)) - sl.logToSpan("fatal", msg, attributes...) -} - -func (sl spanLogger) Debug(msg string, attributes ...trace.Attribute) { - sl.logToSpan("debug", msg, attributes...) -} - -func (sl spanLogger) logToSpan(level string, msg string, attributes ...trace.Attribute) { - attrs := append(attributes, trace.StringAttribute("event", msg), trace.StringAttribute("level", level)) - sl.span.AddAttributes(attrs...) -} - -func (sl nopLogger) Info(msg string, attributes ...trace.Attribute) {} -func (sl nopLogger) Error(err error, attributes ...trace.Attribute) {} -func (sl nopLogger) Fatal(msg string, attributes ...trace.Attribute) {} -func (sl nopLogger) Debug(msg string, attributes ...trace.Attribute) {} diff --git a/opencensus/opencensus.go b/opencensus/opencensus.go deleted file mode 100644 index 6e5aed2..0000000 --- a/opencensus/opencensus.go +++ /dev/null @@ -1,122 +0,0 @@ -package opencensus - -import ( - "context" - "github.com/Azure/azure-amqp-common-go/trace" - oct "go.opencensus.io/trace" - "go.opencensus.io/trace/propagation" -) - -func init() { - trace.Register(new(Trace)) -} - -const ( - propagationKey = "_oc_prop" -) - -type ( - // Trace is the implementation of the OpenCensus trace abstraction - Trace struct{} - - // Span is the implementation of the OpenCensus Span abstraction - Span struct { - span *oct.Span - } -) - -// StartSpan starts a new child span of the current span in the context. If -// there is no span in the context, creates a new trace and span. -// -// Returned context contains the newly created span. You can use it to -// propagate the returned span in process. -func (t *Trace) StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, trace.Spanner) { - ctx, span := oct.StartSpan(ctx, operationName, toOCOption(opts...)...) - return ctx, &Span{span: span} -} - -// StartSpanWithRemoteParent starts a new child span of the span from the given parent. -// -// If the incoming context contains a parent, it ignores. StartSpanWithRemoteParent is -// preferred for cases where the parent is propagated via an incoming request. -// -// Returned context contains the newly created span. You can use it to -// propagate the returned span in process. -func (t *Trace) StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier trace.Carrier, opts ...interface{}) (context.Context, trace.Spanner) { - keysValues := carrier.GetKeyValues() - if val, ok := keysValues[propagationKey]; ok { - if sc, ok := propagation.FromBinary(val.([]byte)); ok { - ctx, span := oct.StartSpanWithRemoteParent(ctx, operationName, sc) - return ctx, &Span{span: span} - } - } - - return t.StartSpan(ctx, operationName) -} - -// FromContext returns the Span stored in a context, or nil if there isn't one. -func (t *Trace) FromContext(ctx context.Context) trace.Spanner { - sp := oct.FromContext(ctx) - return &Span{span: sp} -} - -// NewContext returns a new context with the given Span attached. -func (t *Trace) NewContext(ctx context.Context, span trace.Spanner) context.Context { - if sp, ok := span.InternalSpan().(*oct.Span); ok { - return oct.NewContext(ctx, sp) - } - return ctx -} - -// AddAttributes sets attributes in the span. -// -// Existing attributes whose keys appear in the attributes parameter are overwritten. -func (s *Span) AddAttributes(attributes ...trace.Attribute) { - s.span.AddAttributes(attributesToOCAttributes(attributes...)...) -} - -// End ends the span -func (s *Span) End() { - s.span.End() -} - -// Logger returns a trace.Logger for the span -func (s *Span) Logger() trace.Logger { - return &trace.SpanLogger{Span: s} -} - -// Inject propagation key onto the carrier -func (s *Span) Inject(carrier trace.Carrier) error { - carrier.Set(propagationKey, propagation.Binary(s.span.SpanContext())) - return nil -} - -// InternalSpan returns the real implementation of the Span -func (s *Span) InternalSpan() interface{} { - return s.span -} - -func toOCOption(opts ...interface{}) []oct.StartOption { - var ocStartOptions []oct.StartOption - for _, opt := range opts { - if o, ok := opt.(oct.StartOption); ok { - ocStartOptions = append(ocStartOptions, o) - } - } - return ocStartOptions -} - -func attributesToOCAttributes(attributes ...trace.Attribute) []oct.Attribute { - var ocAttributes []oct.Attribute - for _, attr := range attributes { - switch attr.Value.(type) { - case int64: - ocAttributes = append(ocAttributes, oct.Int64Attribute(attr.Key, attr.Value.(int64))) - case string: - ocAttributes = append(ocAttributes, oct.StringAttribute(attr.Key, attr.Value.(string))) - case bool: - ocAttributes = append(ocAttributes, oct.BoolAttribute(attr.Key, attr.Value.(bool))) - } - } - return ocAttributes -} diff --git a/opentracing/opentracing.go b/opentracing/opentracing.go deleted file mode 100644 index ac1c627..0000000 --- a/opentracing/opentracing.go +++ /dev/null @@ -1,121 +0,0 @@ -package opentracing - -import ( - "context" - "github.com/Azure/azure-amqp-common-go/trace" - "github.com/opentracing/opentracing-go" -) - -func init() { - trace.Register(new(Trace)) -} - -type ( - // Trace is the implementation of the OpenTracing trace abstraction - Trace struct{} - - // Span is the implementation of the OpenTracing Span abstraction - Span struct { - span opentracing.Span - } - - carrierAdapter struct { - carrier trace.Carrier - } -) - -// StartSpan starts and returns a Span with `operationName`, using -// any Span found within `ctx` as a ChildOfRef. If no such parent could be -// found, StartSpanFromContext creates a root (parentless) Span. -func (t *Trace) StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, trace.Spanner) { - span, ctx := opentracing.StartSpanFromContext(ctx, operationName, toOTOption(opts...)...) - return ctx, &Span{span: span} -} - -// StartSpanWithRemoteParent starts and returns a Span with `operationName`, using -// reference span as FollowsFrom -func (t *Trace) StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier trace.Carrier, opts ...interface{}) (context.Context, trace.Spanner) { - sc, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrierAdapter{carrier: carrier}) - if err != nil { - return t.StartSpan(ctx, operationName) - } - - span := opentracing.StartSpan(operationName, append(toOTOption(opts...), opentracing.FollowsFrom(sc))...) - ctx = opentracing.ContextWithSpan(ctx, span) - return ctx, &Span{span: span} -} - -// FromContext returns the `Span` previously associated with `ctx`, or -// `nil` if no such `Span` could be found. -func (t *Trace) FromContext(ctx context.Context) trace.Spanner { - sp := opentracing.SpanFromContext(ctx) - return &Span{span: sp} -} - -// NewContext returns a new context with the given Span attached. -func (t *Trace) NewContext(ctx context.Context, span trace.Spanner) context.Context { - if sp, ok := span.InternalSpan().(opentracing.Span); ok { - return opentracing.ContextWithSpan(ctx, sp) - } - return ctx -} - -// AddAttributes a tags to the span. -// -// If there is a pre-existing tag set for `key`, it is overwritten. -func (s *Span) AddAttributes(attributes ...trace.Attribute) { - for _, attr := range attributes { - s.span.SetTag(attr.Key, attr.Value) - } -} - -// End sets the end timestamp and finalizes Span state. -// -// With the exception of calls to Context() (which are always allowed), -// Finish() must be the last call made to any span instance, and to do -// otherwise leads to undefined behavior. -func (s *Span) End() { - s.span.Finish() -} - -// Logger returns a trace.Logger for the span -func (s *Span) Logger() trace.Logger { - return &trace.SpanLogger{Span: s} -} - -// Inject span context into carrier -func (s *Span) Inject(carrier trace.Carrier) error { - return opentracing.GlobalTracer().Inject(s.span.Context(), opentracing.TextMap, carrierAdapter{carrier: carrier}) -} - -// InternalSpan returns the real implementation of the Span -func (s *Span) InternalSpan() interface{} { - return s.span -} - -// Set a key and value on the carrier -func (ca *carrierAdapter) Set(key, value string) { - ca.carrier.Set(key, value) -} - -// ForeachKey runs the handler across the map of carrier key / values -func (ca *carrierAdapter) ForeachKey(handler func(key, val string) error) error { - for k, v := range ca.carrier.GetKeyValues() { - if vStr, ok := v.(string); ok { - if err := handler(k, vStr); err != nil { - return err - } - } - } - return nil -} - -func toOTOption(opts ...interface{}) []opentracing.StartSpanOption { - var ocStartOptions []opentracing.StartSpanOption - for _, opt := range opts { - if o, ok := opt.(opentracing.StartSpanOption); ok { - ocStartOptions = append(ocStartOptions, o) - } - } - return ocStartOptions -} diff --git a/rpc/rpc.go b/rpc/rpc.go index 10487fc..8f57edc 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -31,11 +31,12 @@ import ( "sync" "time" + "github.com/devigned/tab" + "pack.ag/amqp" + "github.com/Azure/azure-amqp-common-go" "github.com/Azure/azure-amqp-common-go/internal/tracing" - "github.com/Azure/azure-amqp-common-go/log" "github.com/Azure/azure-amqp-common-go/uuid" - "pack.ag/amqp" ) const ( @@ -70,12 +71,11 @@ func NewLink(conn *amqp.Client, address string) (*Link, error) { return nil, err } - return NewLinkWithSession(conn, authSession, address) + return NewLinkWithSession(authSession, address) } // NewLinkWithSession will build a new request response link, but will reuse an existing AMQP session -func NewLinkWithSession(conn *amqp.Client, session *amqp.Session, address string) (*Link, error) { - +func NewLinkWithSession(session *amqp.Session, address string) (*Link, error) { authSender, err := session.NewSender( amqp.LinkTargetAddress(address), ) @@ -109,30 +109,30 @@ func NewLinkWithSession(conn *amqp.Client, session *amqp.Session, address string // RetryableRPC attempts to retry a request a number of times with delay func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration, msg *amqp.Message) (*Response, error) { - span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC") + ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC") defer span.End() res, err := common.Retry(times, delay, func() (interface{}, error) { - span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC.retry") + ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC.retry") defer span.End() res, err := l.RPC(ctx, msg) if err != nil { - log.For(ctx).Error(fmt.Errorf("error in RPC via link %s: %v", l.id, err)) + tab.For(ctx).Error(fmt.Errorf("error in RPC via link %s: %v", l.id, err)) return nil, err } switch { case res.Code >= 200 && res.Code < 300: - log.For(ctx).Debug(fmt.Sprintf("successful rpc on link %s: status code %d and description: %s", l.id, res.Code, res.Description)) + tab.For(ctx).Debug(fmt.Sprintf("successful rpc on link %s: status code %d and description: %s", l.id, res.Code, res.Description)) return res, nil case res.Code >= 500: errMessage := fmt.Sprintf("server error link %s: status code %d and description: %s", l.id, res.Code, res.Description) - log.For(ctx).Error(errors.New(errMessage)) + tab.For(ctx).Error(errors.New(errMessage)) return nil, common.Retryable(errMessage) default: errMessage := fmt.Sprintf("unhandled error link %s: status code %d and description: %s", l.id, res.Code, res.Description) - log.For(ctx).Error(errors.New(errMessage)) + tab.For(ctx).Error(errors.New(errMessage)) return nil, common.Retryable(errMessage) } }) @@ -149,7 +149,7 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) { l.rpcMu.Lock() defer l.rpcMu.Unlock() - span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RPC") + ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RPC") defer span.End() if msg.Properties == nil { @@ -205,17 +205,22 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) { } } - res.Accept() - return &Response{ + response := &Response{ Code: int(statusCode), Description: description, Message: res, - }, err + } + + if err := res.Accept(); err != nil { + return response, err + } + + return response, err } // Close the link receiver, sender and session func (l *Link) Close(ctx context.Context) error { - span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.Close") + ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.Close") defer span.End() if err := l.closeReceiver(ctx); err != nil { @@ -233,7 +238,7 @@ func (l *Link) Close(ctx context.Context) error { } func (l *Link) closeReceiver(ctx context.Context) error { - span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeReceiver") + ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeReceiver") defer span.End() if l.receiver != nil { @@ -243,7 +248,7 @@ func (l *Link) closeReceiver(ctx context.Context) error { } func (l *Link) closeSender(ctx context.Context) error { - span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSender") + ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSender") defer span.End() if l.sender != nil { @@ -253,7 +258,7 @@ func (l *Link) closeSender(ctx context.Context) error { } func (l *Link) closeSession(ctx context.Context) error { - span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSession") + ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSession") defer span.End() if l.session != nil { diff --git a/trace/trace.go b/trace/trace.go deleted file mode 100644 index ffeeaa5..0000000 --- a/trace/trace.go +++ /dev/null @@ -1,175 +0,0 @@ -package trace - -import ( - "context" -) - -var tracer Tracer - -// Register a Tracer instance -func Register(t Tracer) { - tracer = t -} - -// BoolAttribute returns a bool-valued attribute. -func BoolAttribute(key string, value bool) Attribute { - return Attribute{Key: key, Value: value} -} - -// StringAttribute returns a string-valued attribute. -func StringAttribute(key, value string) Attribute { - return Attribute{Key: key, Value: value} -} - -// Int64Attribute returns an int64-valued attribute. -func Int64Attribute(key string, value int64) Attribute { - return Attribute{Key: key, Value: value} -} - -// StartSpan starts a new child span -func StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, Spanner) { - if tracer == nil { - return ctx, new(noOpSpanner) - } - return tracer.StartSpan(ctx, operationName, opts) -} - -// StartSpanWithRemoteParent starts a new child span of the span from the given parent. -func StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier Carrier, opts ...interface{}) (context.Context, Spanner) { - if tracer == nil { - return ctx, new(noOpSpanner) - } - return tracer.StartSpanWithRemoteParent(ctx, operationName, carrier, opts) -} - -// FromContext returns the Span stored in a context, or nil if there isn't one. -func FromContext(ctx context.Context) Spanner { - if tracer == nil { - return new(noOpSpanner) - } - return tracer.FromContext(ctx) -} - -// NewContext returns a new context with the given Span attached. -func NewContext(ctx context.Context, span Spanner) context.Context { - if tracer == nil { - return ctx - } - return tracer.NewContext(ctx, span) -} - -type ( - // Attribute is a key value pair for decorating spans - Attribute struct { - Key string - Value interface{} - } - - // Carrier is an abstraction over OpenTracing and OpenCensus propagation carrier - Carrier interface { - Set(key string, value interface{}) - GetKeyValues() map[string]interface{} - } - - // Spanner is an abstraction over OpenTracing and OpenCensus Spans - Spanner interface { - AddAttributes(attributes ...Attribute) - End() - Logger() Logger - Inject(carrier Carrier) error - InternalSpan() interface{} - } - - // Tracer is an abstraction over OpenTracing and OpenCensus trace implementations - Tracer interface { - StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, Spanner) - StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier Carrier, opts ...interface{}) (context.Context, Spanner) - FromContext(ctx context.Context) Spanner - NewContext(parent context.Context, span Spanner) context.Context - } - - // Logger is a generic interface for logging - Logger interface { - Info(msg string, attributes ...Attribute) - Error(err error, attributes ...Attribute) - Fatal(msg string, attributes ...Attribute) - Debug(msg string, attributes ...Attribute) - } - - // SpanLogger is a Logger implementation which logs to a tracing span - SpanLogger struct { - Span Spanner - } - - noOpLogger struct{} - - noOpSpanner struct{} -) - -// AddAttributes is a nop -func (ns *noOpSpanner) AddAttributes(attributes ...Attribute) {} - -// End is a nop -func (ns *noOpSpanner) End() {} - -// Logger returns a nopLogger -func (ns *noOpSpanner) Logger() Logger { - return noOpLogger{} -} - -// Inject is a nop -func (ns *noOpSpanner) Inject(carrier Carrier) error { - return nil -} - -// InternalSpan returns nil -func (ns *noOpSpanner) InternalSpan() interface{} { - return nil -} - -// For will return a logger for a given context -func For(ctx context.Context) Logger { - if span := tracer.FromContext(ctx); span != nil { - return span.Logger() - } - return new(noOpLogger) -} - -// Info logs an info tag with message to a span -func (sl SpanLogger) Info(msg string, attributes ...Attribute) { - sl.logToSpan("info", msg, attributes...) -} - -// Error logs an error tag with message to a span -func (sl SpanLogger) Error(err error, attributes ...Attribute) { - attributes = append(attributes, BoolAttribute("error", true)) - sl.logToSpan("error", err.Error(), attributes...) -} - -// Fatal logs an error tag with message to a span -func (sl SpanLogger) Fatal(msg string, attributes ...Attribute) { - attributes = append(attributes, BoolAttribute("error", true)) - sl.logToSpan("fatal", msg, attributes...) -} - -// Debug logs a debug tag with message to a span -func (sl SpanLogger) Debug(msg string, attributes ...Attribute) { - sl.logToSpan("debug", msg, attributes...) -} - -func (sl SpanLogger) logToSpan(level string, msg string, attributes ...Attribute) { - attrs := append(attributes, StringAttribute("event", msg), StringAttribute("level", level)) - sl.Span.AddAttributes(attrs...) -} - -// Info nops log entry -func (sl noOpLogger) Info(msg string, attributes ...Attribute) {} - -// Error nops log entry -func (sl noOpLogger) Error(err error, attributes ...Attribute) {} - -// Fatal nops log entry -func (sl noOpLogger) Fatal(msg string, attributes ...Attribute) {} - -// Debug nops log entry -func (sl noOpLogger) Debug(msg string, attributes ...Attribute) {} From f3083d80d0bb995221d8ccb124e5455c725e8e77 Mon Sep 17 00:00:00 2001 From: David Justice Date: Wed, 29 May 2019 14:23:51 -0700 Subject: [PATCH 08/17] update tab dep --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index fd35734..4076fbf 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ require ( github.com/Azure/azure-sdk-for-go v29.0.0+incompatible // indirect github.com/Azure/go-autorest v12.0.0+incompatible github.com/davecgh/go-spew v1.1.1 // indirect - github.com/devigned/tab v0.1.0 + github.com/devigned/tab v0.1.1 github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/fortytw2/leaktest v1.2.0 // indirect github.com/pkg/errors v0.8.0 // indirect diff --git a/go.sum b/go.sum index 0a473be..dbea08f 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/devigned/tab v0.1.0 h1:d2HOIeMxOKGiF5mUpu2WMANekUJO4u44BQl0hkpeKB4= -github.com/devigned/tab v0.1.0/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= +github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA= +github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/fortytw2/leaktest v1.2.0 h1:cj6GCiwJDH7l3tMHLjZDo0QqPtrXJiWSI9JgpeQKw+Q= From b92ea57d45503b368622f728e59dbc6afd67f55c Mon Sep 17 00:00:00 2001 From: David Justice Date: Wed, 29 May 2019 15:25:32 -0700 Subject: [PATCH 09/17] remove persist --- persist/checkpoint.go | 69 ----------------------------- persist/file.go | 100 ------------------------------------------ persist/file_test.go | 80 --------------------------------- persist/persist.go | 81 ---------------------------------- 4 files changed, 330 deletions(-) delete mode 100644 persist/checkpoint.go delete mode 100644 persist/file.go delete mode 100644 persist/file_test.go delete mode 100644 persist/persist.go diff --git a/persist/checkpoint.go b/persist/checkpoint.go deleted file mode 100644 index aa0766f..0000000 --- a/persist/checkpoint.go +++ /dev/null @@ -1,69 +0,0 @@ -package persist - -// MIT License -// -// Copyright (c) Microsoft Corporation. All rights reserved. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE - -import ( - "time" -) - -const ( - // StartOfStream is a constant defined to represent the start of a partition stream in EventHub. - StartOfStream = "-1" - - // EndOfStream is a constant defined to represent the current end of a partition stream in EventHub. - // This can be used as an offset argument in receiver creation to start receiving from the latest - // event, instead of a specific offset or point in time. - EndOfStream = "@latest" -) - -type ( - // Checkpoint is the information needed to determine the last message processed - Checkpoint struct { - Offset string `json:"offset"` - SequenceNumber int64 `json:"sequenceNumber"` - EnqueueTime time.Time `json:"enqueueTime"` - } -) - -// NewCheckpointFromStartOfStream returns a checkpoint for the start of the stream -func NewCheckpointFromStartOfStream() Checkpoint { - return Checkpoint{ - Offset: StartOfStream, - } -} - -// NewCheckpointFromEndOfStream returns a checkpoint for the end of the stream -func NewCheckpointFromEndOfStream() Checkpoint { - return Checkpoint{ - Offset: EndOfStream, - } -} - -// NewCheckpoint contains the information needed to checkpoint Event Hub progress -func NewCheckpoint(offset string, sequence int64, enqueueTime time.Time) Checkpoint { - return Checkpoint{ - Offset: offset, - SequenceNumber: sequence, - EnqueueTime: enqueueTime, - } -} diff --git a/persist/file.go b/persist/file.go deleted file mode 100644 index 8074f88..0000000 --- a/persist/file.go +++ /dev/null @@ -1,100 +0,0 @@ -package persist - -// MIT License -// -// Copyright (c) Microsoft Corporation. All rights reserved. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE - -import ( - "bytes" - "encoding/json" - "io" - "os" - "path" - "strings" - "sync" -) - -type ( - // FilePersister implements CheckpointPersister for saving to the file system - FilePersister struct { - directory string - mu sync.Mutex - } -) - -// NewFilePersister creates a FilePersister for saving to a given directory -func NewFilePersister(directory string) (*FilePersister, error) { - err := os.MkdirAll(directory, 0777) - return &FilePersister{ - directory: directory, - }, err -} - -func (fp *FilePersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error { - fp.mu.Lock() - defer fp.mu.Unlock() - - key := getFilePath(namespace, name, consumerGroup, partitionID) - filePath := path.Join(fp.directory, key) - bits, err := json.Marshal(checkpoint) - if err != nil { - return err - } - - file, err := os.Create(filePath) - if err != nil { - return err - } - _, err = file.Write(bits) - if err != nil { - return err - } - - return file.Close() -} - -func (fp *FilePersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) { - fp.mu.Lock() - defer fp.mu.Unlock() - - key := getFilePath(namespace, name, consumerGroup, partitionID) - filePath := path.Join(fp.directory, key) - - f, err := os.Open(filePath) - if err != nil { - return NewCheckpointFromStartOfStream(), nil - } - - buf := bytes.NewBuffer(nil) - _, err = io.Copy(buf, f) - if err != nil { - return NewCheckpointFromStartOfStream(), err - } - - var checkpoint Checkpoint - err = json.Unmarshal(buf.Bytes(), &checkpoint) - return checkpoint, err -} - -func getFilePath(namespace, name, consumerGroup, partitionID string) string { - key := strings.Join([]string{namespace, name, consumerGroup, partitionID}, "_") - return strings.Replace(key, "$", "", -1) -} diff --git a/persist/file_test.go b/persist/file_test.go deleted file mode 100644 index e679090..0000000 --- a/persist/file_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package persist - -// MIT License -// -// Copyright (c) Microsoft Corporation. All rights reserved. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE - -import ( - "math/rand" - "os" - "path" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -var ( - letterRunes = []rune("abcdefghijklmnopqrstuvwxyz123456789") -) - -func init() { - rand.Seed(time.Now().Unix()) -} - -func TestFilePersister_Read(t *testing.T) { - namespace := "namespace" - name := "name" - group := "$Default" - partitionID := "0" - dir := path.Join(os.TempDir(), RandomName("read", 4)) - persister, err := NewFilePersister(dir) - assert.Nil(t, err) - ckp, err := persister.Read(namespace, name, group, partitionID) - assert.Nil(t, err) - assert.Equal(t, NewCheckpointFromStartOfStream(), ckp) -} - -func TestFilePersister_Write(t *testing.T) { - namespace := "namespace" - name := "name" - group := "$Default" - partitionID := "0" - dir := path.Join(os.TempDir(), RandomName("write", 4)) - persister, err := NewFilePersister(dir) - assert.Nil(t, err) - ckp := NewCheckpoint("120", 22, time.Now()) - err = persister.Write(namespace, name, group, partitionID, ckp) - assert.Nil(t, err) - ckp2, err := persister.Read(namespace, name, group, partitionID) - assert.Nil(t, err) - assert.Equal(t, ckp.Offset, ckp2.Offset) - assert.Equal(t, ckp.SequenceNumber, ckp2.SequenceNumber) -} - -// RandomName generates a random Event Hub name -func RandomName(prefix string, length int) string { - b := make([]rune, length) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return prefix + "-" + string(b) -} diff --git a/persist/persist.go b/persist/persist.go deleted file mode 100644 index 5f7d314..0000000 --- a/persist/persist.go +++ /dev/null @@ -1,81 +0,0 @@ -// Package persist provides abstract structures for checkpoint persistence. -package persist - -// MIT License -// -// Copyright (c) Microsoft Corporation. All rights reserved. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE - -import ( - "fmt" - "path" - "sync" -) - -type ( - // CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and - // offset so that if a receiver where to be interrupted, it could resume after the last consumed event. - CheckpointPersister interface { - Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error - Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) - } - - // MemoryPersister is a default implementation of a Hub CheckpointPersister, which will persist offset information in - // memory. - MemoryPersister struct { - values map[string]Checkpoint - mu sync.Mutex - } -) - -// NewMemoryPersister creates a new in-memory storage for checkpoints -// -// MemoryPersister is only intended to be shared with EventProcessorHosts within the same process. This implementation -// is a toy. You should probably use the Azure Storage implementation or any other that provides durable storage for -// checkpoints. -func NewMemoryPersister() *MemoryPersister { - return &MemoryPersister{ - values: make(map[string]Checkpoint), - } -} - -func (p *MemoryPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error { - p.mu.Lock() - defer p.mu.Unlock() - - key := getPersistenceKey(namespace, name, consumerGroup, partitionID) - p.values[key] = checkpoint - return nil -} - -func (p *MemoryPersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) { - p.mu.Lock() - defer p.mu.Unlock() - - key := getPersistenceKey(namespace, name, consumerGroup, partitionID) - if offset, ok := p.values[key]; ok { - return offset, nil - } - return NewCheckpointFromStartOfStream(), fmt.Errorf("could not read the offset for the key %s", key) -} - -func getPersistenceKey(namespace, name, consumerGroup, partitionID string) string { - return path.Join(namespace, name, consumerGroup, partitionID) -} From 221baf6205a23fd41091824f40fdda942091177b Mon Sep 17 00:00:00 2001 From: David Justice Date: Wed, 29 May 2019 16:25:14 -0700 Subject: [PATCH 10/17] add change log entry --- changelog.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/changelog.md b/changelog.md index f2fd75d..7000c24 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,9 @@ # Change Log +## `v2.0.0` +- [**breaking change** remove persist and move into the Event Hubs package](https://github.com/Azure/azure-event-hubs-go/pull/112) +- **breaking change** remove log package in favor of https://github.com/devigned/tab + ## `v1.1.4` - allow status description on RPC calls to be empty without returning an error https://github.com/Azure/azure-event-hubs-go/issues/88 From ce8a381136508546dbc1f09f38d23ed7bef730ce Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 30 May 2019 14:28:17 -0700 Subject: [PATCH 11/17] move module to v2 --- aad/jwt.go | 3 ++- cbs/cbs.go | 6 +++--- go.mod | 2 +- internal/tracing/tracing.go | 2 +- internal/version.go | 2 +- rpc/rpc.go | 6 +++--- sas/sas.go | 4 ++-- 7 files changed, 13 insertions(+), 12 deletions(-) diff --git a/aad/jwt.go b/aad/jwt.go index 4a82451..c076eaf 100644 --- a/aad/jwt.go +++ b/aad/jwt.go @@ -33,10 +33,11 @@ import ( "strconv" "time" - "github.com/Azure/azure-amqp-common-go/auth" "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" "golang.org/x/crypto/pkcs12" + + "github.com/Azure/azure-amqp-common-go/v2/auth" ) const ( diff --git a/cbs/cbs.go b/cbs/cbs.go index b1a6c6a..6373fc1 100644 --- a/cbs/cbs.go +++ b/cbs/cbs.go @@ -32,9 +32,9 @@ import ( "github.com/devigned/tab" "pack.ag/amqp" - "github.com/Azure/azure-amqp-common-go/auth" - "github.com/Azure/azure-amqp-common-go/internal/tracing" - "github.com/Azure/azure-amqp-common-go/rpc" + "github.com/Azure/azure-amqp-common-go/v2/auth" + "github.com/Azure/azure-amqp-common-go/v2/internal/tracing" + "github.com/Azure/azure-amqp-common-go/v2/rpc" ) const ( diff --git a/go.mod b/go.mod index 4076fbf..76f87bf 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/Azure/azure-amqp-common-go +module github.com/Azure/azure-amqp-common-go/v2 require ( contrib.go.opencensus.io/exporter/ocagent v0.5.0 // indirect diff --git a/internal/tracing/tracing.go b/internal/tracing/tracing.go index 8a03355..0ede73a 100644 --- a/internal/tracing/tracing.go +++ b/internal/tracing/tracing.go @@ -6,7 +6,7 @@ import ( "github.com/devigned/tab" - "github.com/Azure/azure-amqp-common-go/internal" + "github.com/Azure/azure-amqp-common-go/v2/internal" ) // StartSpanFromContext starts a span given a context and applies common library information diff --git a/internal/version.go b/internal/version.go index 2708014..42eecb7 100644 --- a/internal/version.go +++ b/internal/version.go @@ -2,5 +2,5 @@ package common const ( // Version is the semantic version of the library - Version = "1.1.4" + Version = "2.0.0" ) diff --git a/rpc/rpc.go b/rpc/rpc.go index 8f57edc..a98eb10 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -34,9 +34,9 @@ import ( "github.com/devigned/tab" "pack.ag/amqp" - "github.com/Azure/azure-amqp-common-go" - "github.com/Azure/azure-amqp-common-go/internal/tracing" - "github.com/Azure/azure-amqp-common-go/uuid" + "github.com/Azure/azure-amqp-common-go/v2" + "github.com/Azure/azure-amqp-common-go/v2/internal/tracing" + "github.com/Azure/azure-amqp-common-go/v2/uuid" ) const ( diff --git a/sas/sas.go b/sas/sas.go index 60901cb..101fd64 100644 --- a/sas/sas.go +++ b/sas/sas.go @@ -36,8 +36,8 @@ import ( "strings" "time" - "github.com/Azure/azure-amqp-common-go/auth" - "github.com/Azure/azure-amqp-common-go/conn" + "github.com/Azure/azure-amqp-common-go/v2/auth" + "github.com/Azure/azure-amqp-common-go/v2/conn" ) type ( From 260eef350889401efb2f39e96bb65b14450628dd Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 30 May 2019 14:52:32 -0700 Subject: [PATCH 12/17] update travis --- .travis.yml | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index a611058..387a2d9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,19 +2,22 @@ language: go sudo: false go: - 1.x - - tip + - 1.11.x env: - GO111MODULE=on matrix: - allow_failures: - - go: tip fast_finish: true before_install: - - go get github.com/fzipp/gocyclo - - go get honnef.co/go/tools/cmd/megacheck + - cd ${TRAVIS_HOME} + - go get github.com/mattn/goveralls + - go get golang.org/x/tools/cmd/cover + - go get github.com/fzipp/gocyclo + - go get golang.org/x/lint/golint + - cd ${TRAVIS_BUILD_DIR} script: - make test \ No newline at end of file + - export GO111MODULE=on + - make test \ No newline at end of file From a63266062f526b78a59936a0651a2ae09f08b851 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 30 May 2019 14:53:13 -0700 Subject: [PATCH 13/17] update travis --- .travis.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 387a2d9..bb9bfb1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,9 +4,6 @@ go: - 1.x - 1.11.x -env: - - GO111MODULE=on - matrix: fast_finish: true From 3398b0c9aff38e96213b28a06231364b848dad66 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 30 May 2019 14:56:14 -0700 Subject: [PATCH 14/17] update travis --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index bb9bfb1..f3f2a01 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,5 +16,4 @@ before_install: - cd ${TRAVIS_BUILD_DIR} script: - - export GO111MODULE=on - make test \ No newline at end of file From 0215fc177b3d4d765b9c594bda6744edc8fc4c5d Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 30 May 2019 14:59:59 -0700 Subject: [PATCH 15/17] update travis --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index f3f2a01..bb9bfb1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,4 +16,5 @@ before_install: - cd ${TRAVIS_BUILD_DIR} script: + - export GO111MODULE=on - make test \ No newline at end of file From 567e94ecda78526f636b74390bdc662aba3a583a Mon Sep 17 00:00:00 2001 From: David Justice Date: Fri, 31 May 2019 11:15:52 -0700 Subject: [PATCH 16/17] update the readme and the mod file for go version --- README.md | 16 ++++++++++++---- go.mod | 2 ++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 277140b..4ea4650 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,23 @@ [![Build Status](https://travis-ci.org/Azure/azure-amqp-common-go.svg?branch=master)](https://travis-ci.org/Azure/azure-amqp-common-go) This project contains reusable components for AMQP based services like Event Hub and Service Bus. You will find -abstractions over authentication, claims-based security, connection string parsing, checkpointing and RPC for AMQP. +abstractions over authentication, claims-based security, connection string parsing and RPC for AMQP. -If you are looking for the Azure Event Hub library for go, you can find it [here](https://github.com/Azure/azure-event-hubs-go). +If you are looking for the Azure Event Hub library for go, you can find it [here](https://aka.ms/azure-event-hubs-go). + +If you are looking for the Azure Service Bus library for go, you can find it [here](https://aka.ms/azure-service-bus-go). ## Install +If you want to use stable versions of the library, please use Go modules. -### via go get +### Using go get with Go modules targeting version 2.x.x ``` bash -go get github.com/Azure/azure-amqp-common-go +go get -u github.com/Azure/azure-amqp-common-go/v2 +``` + +### Using go get with Go modules targeting version 1.x.x +``` bash +go get -u github.com/Azure/azure-amqp-common-go ``` ## Contributing diff --git a/go.mod b/go.mod index 76f87bf..83796ae 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/Azure/azure-amqp-common-go/v2 +go 1.12 + require ( contrib.go.opencensus.io/exporter/ocagent v0.5.0 // indirect github.com/Azure/azure-sdk-for-go v29.0.0+incompatible // indirect From 8b39f5ce4f11e99edf5036543354e84b6bcc9c3f Mon Sep 17 00:00:00 2001 From: David Justice Date: Wed, 5 Jun 2019 15:24:09 -0700 Subject: [PATCH 17/17] clean up readme a bit --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4ea4650..3f4ec80 100644 --- a/README.md +++ b/README.md @@ -10,15 +10,15 @@ If you are looking for the Azure Event Hub library for go, you can find it [here If you are looking for the Azure Service Bus library for go, you can find it [here](https://aka.ms/azure-service-bus-go). -## Install +## Install with Go modules If you want to use stable versions of the library, please use Go modules. -### Using go get with Go modules targeting version 2.x.x +### Using go get targeting version 2.x.x ``` bash go get -u github.com/Azure/azure-amqp-common-go/v2 ``` -### Using go get with Go modules targeting version 1.x.x +### Using go get targeting version 1.x.x ``` bash go get -u github.com/Azure/azure-amqp-common-go ```