fix event propigation and add msg modify

This commit is contained in:
David Justice 2018-05-16 11:39:14 -07:00
Родитель 5b9a0d2ea5
Коммит 12e0cdbea2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
7 изменённых файлов: 121 добавлений и 34 удалений

51
Gopkg.lock сгенерированный
Просмотреть файл

@ -15,8 +15,8 @@
"sas",
"uuid"
]
revision = "7c6d990e663e3d68d2e7db38a2ede690bb9ad002"
version = "v0.4.0"
revision = "8d02d4e77aba80165452199bb9bb24a82eb3b219"
version = "v0.5.0"
[[projects]]
name = "github.com/Azure/azure-sdk-for-go"
@ -42,6 +42,12 @@
revision = "eaa7994b2278094c904d31993d26f56324db3052"
version = "v10.8.1"
[[projects]]
branch = "master"
name = "github.com/codahale/hdrhistogram"
packages = ["."]
revision = "3a0bb77429bd3a61596f5e8a3172445844342120"
[[projects]]
name = "github.com/davecgh/go-spew"
packages = ["spew"]
@ -87,23 +93,52 @@
version = "v1.2.1"
[[projects]]
branch = "master"
name = "golang.org/x/net"
packages = ["context"]
revision = "f73e4c9ed3b7ebdd5f699a16a880c2b1994e50dd"
name = "github.com/uber/jaeger-client-go"
packages = [
".",
"config",
"internal/baggage",
"internal/baggage/remote",
"internal/spanlog",
"internal/throttler",
"internal/throttler/remote",
"log",
"rpcmetrics",
"thrift",
"thrift-gen/agent",
"thrift-gen/baggage",
"thrift-gen/jaeger",
"thrift-gen/sampling",
"thrift-gen/zipkincore",
"utils"
]
revision = "b043381d944715b469fd6b37addfd30145ca1758"
version = "v2.14.0"
[[projects]]
name = "github.com/uber/jaeger-lib"
packages = ["metrics"]
revision = "ed3a127ec5fef7ae9ea95b01b542c47fbd999ce5"
version = "v1.5.0"
[[projects]]
branch = "master"
name = "golang.org/x/net"
packages = ["context"]
revision = "2491c5de3490fced2f6cff376127c667efeed857"
[[projects]]
name = "pack.ag/amqp"
packages = [
".",
"internal/testconn"
]
revision = "3714ffb325b31a3619d1ac2cdf678fa51ec446ef"
revision = "390d7eaeac0368026b93a13f4f7bc30d903c61cd"
version = "v0.5.0"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "650c8d9022e6e985cd93762ec02122314aa6daeeb2b083b3b3ba3c45a5c64549"
inputs-digest = "b881f99847eeeaaa110edf7e17a56a5da4bcf7d05c4252cf1521caca57b1c4dc"
solver-name = "gps-cdcl"
solver-version = 1

Просмотреть файл

@ -5,7 +5,7 @@
[[constraint]]
name = "pack.ag/amqp"
branch = "master"
version = "0.5"
[[constraint]]
name = "github.com/Azure/azure-sdk-for-go"
@ -13,4 +13,4 @@
[[constraint]]
name = "github.com/Azure/azure-amqp-common-go"
version = "0.4"
version = "0.5"

Просмотреть файл

@ -27,7 +27,7 @@ import (
)
type (
// Event is an Event Hubs message to be sent or received
// Event is an Service Bus message to be sent or received
Event struct {
Data []byte
Properties map[string]interface{}
@ -36,13 +36,6 @@ type (
GroupSequence *uint32
message *amqp.Message
}
// EventBatch is a batch of Event Hubs messages to be sent
EventBatch struct {
Events []*Event
Properties map[string]interface{}
ID string
}
)
// NewEventFromString builds an Event from a string message
@ -57,13 +50,6 @@ func NewEvent(data []byte) *Event {
}
}
// NewEventBatch builds an EventBatch from an array of Events
func NewEventBatch(events []*Event) *EventBatch {
return &EventBatch{
Events: events,
}
}
// Set implements opentracing.TextMapWriter and sets properties on the event to be propagated to the message broker
func (e *Event) Set(key, value string) {
if e.Properties == nil {
@ -73,9 +59,9 @@ func (e *Event) Set(key, value string) {
}
// ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker
func (e *Event) ForeachKey(handler func(key string, val interface{}) error) error {
func (e *Event) ForeachKey(handler func(key, val string) error) error {
for key, value := range e.Properties {
err := handler(key, value)
err := handler(key, value.(string))
if err != nil {
return err
}
@ -127,7 +113,10 @@ func newEvent(data []byte, msg *amqp.Message) *Event {
}
if msg != nil {
event.Properties = msg.ApplicationProperties
event.Properties = make(map[string]interface{})
for key, value := range msg.ApplicationProperties {
event.Properties[key] = value
}
}
return event
}

Просмотреть файл

@ -23,17 +23,21 @@ package test
// SOFTWARE
import (
"context"
"io"
"math/rand"
"os"
"time"
"context"
rm "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
sbmgmt "github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2017-04-01/servicebus"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/stretchr/testify/suite"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
jaegerlog "github.com/uber/jaeger-client-go/log"
)
const (
@ -54,6 +58,7 @@ type (
Token *adal.ServicePrincipalToken
Environment azure.Environment
TagID string
closer io.Closer
}
)
@ -76,6 +81,7 @@ func (suite *BaseSuite) SetupSuite() {
suite.Token = suite.servicePrincipalToken()
suite.Environment = azure.PublicCloud
suite.TagID = RandomString("tag", 10)
suite.setupTracing()
err := suite.ensureProvisioned(sbmgmt.SkuTierStandard)
if err != nil {
@ -85,7 +91,9 @@ func (suite *BaseSuite) SetupSuite() {
// TearDownSuite destroys created resources during the run of the suite
func (suite *BaseSuite) TearDownSuite() {
// tear down queues and subscriptions maybe??
if suite.closer != nil {
_ = suite.closer.Close()
}
}
func mustGetEnv(key string) string {
@ -155,6 +163,36 @@ func (suite *BaseSuite) ensureProvisioned(tier sbmgmt.SkuTier) error {
return nil
}
func (suite *BaseSuite) setupTracing() error {
if os.Getenv("TRACING") == "true" {
// Sample configuration for testing. Use constant sampling to sample every trace
// and enable LogSpan to log every span via configured Logger.
cfg := config.Configuration{
Sampler: &config.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &config.ReporterConfig{
LocalAgentHostPort: "0.0.0.0:6831",
},
}
// Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log
// and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics
// frameworks.
jLogger := jaegerlog.StdLogger
closer, err := cfg.InitGlobalTracer(
"ehtests",
config.Logger(jLogger),
)
suite.closer = closer
return err
}
return nil
}
// RandomName generates a random Event Hub name tagged with the suite id
func (suite *BaseSuite) RandomName(prefix string, length int) string {
return RandomString(prefix, length) + "-" + suite.TagID

Просмотреть файл

@ -53,9 +53,11 @@ func TestServiceBusSuite(t *testing.T) {
// TearDownSuite destroys created resources during the run of the suite
func (suite *serviceBusSuite) TearDownSuite() {
suite.BaseSuite.TearDownSuite()
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
suite.deleteAllTaggedQueues(ctx)
suite.deleteAllTaggedTopics(ctx)
}
func (suite *serviceBusSuite) deleteAllTaggedQueues(ctx context.Context) {
@ -77,6 +79,25 @@ func (suite *serviceBusSuite) deleteAllTaggedQueues(ctx context.Context) {
}
}
func (suite *serviceBusSuite) deleteAllTaggedTopics(ctx context.Context) {
ns := suite.getNewSasInstance()
tm := ns.NewTopicManager()
feed, err := tm.List(ctx)
if err != nil {
suite.T().Fatal(err)
}
for _, entry := range feed.Entries {
if strings.HasSuffix(entry.Title, suite.TagID) {
err := tm.Delete(ctx, entry.Title)
if err != nil {
suite.T().Fatal(err)
}
}
}
}
func (suite *serviceBusSuite) getNewSasInstance() *Namespace {
ns, err := getNewSasInstance(suite.ConnStr)
if err != nil {

Просмотреть файл

@ -127,7 +127,7 @@ func (r *receiver) handleMessages(ctx context.Context, messages chan *amqp.Messa
func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) {
event := eventFromMsg(msg)
var span opentracing.Span
wireContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, event)
wireContext, err := extractWireContext(event)
if err == nil {
span, ctx = r.startConsumerSpanFromWire(ctx, "sb.receiver.handleMessage", wireContext)
} else {
@ -140,13 +140,17 @@ func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
err = handler(ctx, event)
if err != nil {
msg.Reject()
log.For(ctx).Error(fmt.Errorf("message rejected: id: %v", id))
msg.Modify(true, false, nil)
log.For(ctx).Error(fmt.Errorf("message modify(true, false, nil): id: %v", id))
return
}
msg.Accept()
}
func extractWireContext(reader opentracing.TextMapReader) (opentracing.SpanContext, error) {
return opentracing.GlobalTracer().Extract(opentracing.TextMap, reader)
}
func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Message) {
span, ctx := r.startConsumerSpanFromContext(ctx, "sb.receiver.listenForMessages")
defer span.Finish()

Просмотреть файл

@ -94,7 +94,7 @@ func (t *Topic) NewSubscriptionManager() *SubscriptionManager {
}
// NewSubscriptionManager creates a new SubscriptionManger for a Service Bus Namespace
func (ns *Namespace) NewSubscriptionManager(topicName, name string) *SubscriptionManager {
func (ns *Namespace) NewSubscriptionManager(topicName string) *SubscriptionManager {
t := ns.NewTopic(topicName)
return &SubscriptionManager{
EntityManager: NewEntityManager(t.namespace.getHTTPSHostURI(), t.namespace.TokenProvider),