This commit is contained in:
David Justice 2018-06-15 10:20:30 -07:00
Родитель 68953c9c03
Коммит 4faa09c629
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
7 изменённых файлов: 368 добавлений и 193 удалений

18
Gopkg.lock сгенерированный
Просмотреть файл

@ -39,8 +39,8 @@
"autorest/to",
"autorest/validation"
]
revision = "d21db7ee8958d471f5b185d700762c903549ee08"
version = "v10.10.0"
revision = "76796dcb80ab6491bf22e344402023c081a7a282"
version = "v10.11.1"
[[projects]]
branch = "master"
@ -60,6 +60,12 @@
revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e"
version = "v3.2.0"
[[projects]]
branch = "master"
name = "github.com/mitchellh/mapstructure"
packages = ["."]
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
[[projects]]
name = "github.com/opentracing/opentracing-go"
packages = [
@ -89,8 +95,8 @@
"require",
"suite"
]
revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71"
version = "v1.2.1"
revision = "f35b8ab0b5a2cef36673838d662e249dd9c94686"
version = "v1.2.2"
[[projects]]
name = "github.com/uber/jaeger-client-go"
@ -125,7 +131,7 @@
branch = "master"
name = "golang.org/x/net"
packages = ["context"]
revision = "1e491301e022f8f977054da4c2d852decd59571f"
revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196"
[[projects]]
name = "pack.ag/amqp"
@ -139,6 +145,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "b881f99847eeeaaa110edf7e17a56a5da4bcf7d05c4252cf1521caca57b1c4dc"
inputs-digest = "6bae36bbcba90590521ac949fd4f7bae7cbfa2af26e2426f3802bc842409ca18"
solver-name = "gps-cdcl"
solver-version = 1

Просмотреть файл

@ -5,7 +5,7 @@
[[constraint]]
name = "pack.ag/amqp"
version = "0.5"
version = "0.5.1"
[[constraint]]
name = "github.com/Azure/azure-sdk-for-go"

Просмотреть файл

@ -24,29 +24,34 @@ package servicebus
import (
"context"
"fmt"
"time"
"github.com/Azure/go-autorest/autorest/to"
"github.com/mitchellh/mapstructure"
"pack.ag/amqp"
)
type (
// Message is an Service Bus message to be sent or received
Message struct {
ContentType string
CorrelationID string
Data []byte
DeliveryCount uint32
GroupID *string
GroupSequence *uint32
ID string
Label string
PartitionKey string
Properties map[string]interface{}
ReplyTo string
ReplyToGroupID string
To string
TTL time.Duration
message *amqp.Message
ContentType string
CorrelationID string
Data []byte
DeliveryCount uint32
GroupID *string
GroupSequence *uint32
ID string
Label string
PartitionKey string
ReplyTo string
ReplyToGroupID string
To string
TTL time.Duration
LockToken *string
SystemProperties *SystemProperties
UserProperties map[string]interface{}
message *amqp.Message
}
// DispositionAction represents the action to notify Azure Service Bus of the Message's disposition
@ -54,6 +59,29 @@ type (
// MessageErrorCondition represents a well-known collection of AMQP errors
MessageErrorCondition string
deliveryAnnotations struct {
LockToken *amqp.UUID `mapstructure:"x-opt-lock-token"`
}
// SystemProperties are used to store properties that are set by the system.
SystemProperties struct {
LockedUntil *time.Time `mapstructure:"x-opt-locked-until"`
SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"`
PartitionID *int16 `mapstructure:"x-opt-partition-id"`
PartitionKey *string `mapstructure:"x-opt-partition-key"`
EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"`
DeadLetterSource *string `mapstructure:"x-opt-deadletter-source"`
ScheduledEnqueuedTime *time.Time `mapstructure:"x-opt-scheduled-enqueued-time"`
EnqueuedSequenceNumber *int64 `mapstructure:"x-opt-enqueue-sequence-number"`
ViaPartitionKey *string `mapstructure:"x-opt-via-partition-key"`
}
// MessageWithContext is a Service Bus message with its context which propagates the distributed trace information
MessageWithContext struct {
*Message
Ctx context.Context
}
)
// Error Conditions
@ -73,22 +101,29 @@ const (
)
const (
//Vendor = "com.microsoft"
//EnqueueTimeUTCName = "x-opt-enqueue-time"
//ScheduledEnqueueTimeUTCName = "x-opt-scheduled-enqueue-time"
//SequenceNumberName = "x-opt-sequence-number"
//OffsetName = "x-opt-offset"
//LockedUntilName = "x-opt-locked-until"
//PublisherName = "x-opt-publisher"
//PartitionKeyName = "x-opt-partition-key"
//PartitionIDName = "x-opt-partition-id"
//ViaPartitionKeyName = "x-opt-via-partition-key"
//DeadLetterSourceName = "x-opt-deadletter-source"
//TimeSpanName = Vendor + ":timespan"
//UriName = Vendor + ":uri"
//DateTimeOffsetName = Vendor + ":datetime-offset"
lockTokenName = "x-opt-lock-token"
)
// Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue
func (m *MessageWithContext) Complete() {
m.Message.Complete()(m.Ctx)
}
// Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.
func (m *MessageWithContext) Abandon() {
m.Message.Abandon()(m.Ctx)
}
// DeadLetter will notify Azure Service Bus the message failed and should not re-queued
func (m *MessageWithContext) DeadLetter(err error) {
m.Message.DeadLetter(err)(m.Ctx)
}
// DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context
func (m *MessageWithContext) DeadLetterWithInfo(err error, condition MessageErrorCondition, additionalData map[string]string) {
m.Message.DeadLetterWithInfo(err, condition, additionalData)(m.Ctx)
}
// NewMessageFromString builds an Message from a string message
func NewMessageFromString(message string) *Message {
return NewMessage([]byte(message))
@ -157,7 +192,8 @@ func (m *Message) DeadLetter(err error) DispositionAction {
}
}
// DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context
// DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional
// context
func (m *Message) DeadLetterWithInfo(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction {
var info map[string]interface{}
if additionalData != nil {
@ -182,15 +218,15 @@ func (m *Message) DeadLetterWithInfo(err error, condition MessageErrorCondition,
// Set implements opentracing.TextMapWriter and sets properties on the event to be propagated to the message broker
func (m *Message) Set(key, value string) {
if m.Properties == nil {
m.Properties = make(map[string]interface{})
if m.UserProperties == nil {
m.UserProperties = make(map[string]interface{})
}
m.Properties[key] = value
m.UserProperties[key] = value
}
// ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker
func (m *Message) ForeachKey(handler func(key, val string) error) error {
for key, value := range m.Properties {
for key, value := range m.UserProperties {
err := handler(key, value.(string))
if err != nil {
return err
@ -220,26 +256,37 @@ func (m *Message) toMsg() *amqp.Message {
amqpMsg.Properties.ReplyTo = m.ReplyTo
amqpMsg.Properties.ReplyToGroupID = m.ReplyToGroupID
if len(m.Properties) > 0 {
if len(m.UserProperties) > 0 {
amqpMsg.ApplicationProperties = make(map[string]interface{})
for key, value := range m.Properties {
for key, value := range m.UserProperties {
amqpMsg.ApplicationProperties[key] = value
}
}
if m.LockToken != nil {
if amqpMsg.DeliveryAnnotations == nil {
amqpMsg.DeliveryAnnotations = make(amqp.Annotations)
}
amqpMsg.DeliveryAnnotations[lockTokenName] = m.LockToken
}
return amqpMsg
}
func messageFromAMQPMessage(msg *amqp.Message) *Message {
func messageFromAMQPMessage(msg *amqp.Message) (*Message, error) {
return newMessage(msg.Data[0], msg)
}
func newMessage(data []byte, amqpMsg *amqp.Message) *Message {
func newMessage(data []byte, amqpMsg *amqp.Message) (*Message, error) {
msg := &Message{
Data: data,
message: amqpMsg,
}
if amqpMsg == nil {
return msg, nil
}
if amqpMsg.Properties != nil {
if id, ok := amqpMsg.Properties.MessageID.(string); ok {
msg.ID = id
@ -254,15 +301,26 @@ func newMessage(data []byte, amqpMsg *amqp.Message) *Message {
msg.To = amqpMsg.Properties.To
msg.ReplyTo = amqpMsg.Properties.ReplyTo
msg.ReplyToGroupID = amqpMsg.Properties.ReplyToGroupID
msg.DeliveryCount = amqpMsg.Header.DeliveryCount
msg.DeliveryCount = amqpMsg.Header.DeliveryCount + 1
msg.TTL = amqpMsg.Header.TTL
}
if amqpMsg != nil {
msg.Properties = make(map[string]interface{})
for key, value := range amqpMsg.ApplicationProperties {
msg.Properties[key] = value
if amqpMsg.Annotations != nil {
if err := mapstructure.Decode(amqpMsg.Annotations, &msg.SystemProperties); err != nil {
return msg, err
}
}
return msg
if amqpMsg.DeliveryAnnotations != nil {
var da deliveryAnnotations
if err := mapstructure.Decode(amqpMsg.DeliveryAnnotations, &da); err != nil {
fmt.Println("ERROR!!", err.Error())
return msg, err
}
if da.LockToken != nil {
msg.LockToken = to.StringPtr(da.LockToken.String())
}
}
return msg, nil
}

Просмотреть файл

@ -220,6 +220,15 @@ func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption {
}
}
// QueueEntityWithMaxDeliveryCount configures the queue to have a maximum number of delivery attempts before
// dead-lettering the message
func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption {
return func(q *QueueDescription) error {
q.MaxDeliveryCount = &count
return nil
}
}
// NewQueueManager creates a new QueueManager for a Service Bus Namespace
func (ns *Namespace) NewQueueManager() *QueueManager {
return &QueueManager{
@ -371,13 +380,13 @@ func QueueWithReceiveAndDelete() QueueOption {
}
}
// QueueWithRequiredSession configures a queue to use a session
func QueueWithRequiredSession(sessionID string) QueueOption {
return func(q *Queue) error {
q.requiredSessionID = &sessionID
return nil
}
}
//// QueueWithRequiredSession configures a queue to use a session
//func QueueWithRequiredSession(sessionID string) QueueOption {
// return func(q *Queue) error {
// q.requiredSessionID = &sessionID
// return nil
// }
//}
// NewQueue creates a new Queue Sender / Receiver
func (ns *Namespace) NewQueue(ctx context.Context, name string, opts ...QueueOption) (*Queue, error) {
@ -415,8 +424,16 @@ func (q *Queue) Send(ctx context.Context, event *Message) error {
}
// ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.
func (q *Queue) ReceiveOne(ctx context.Context) (*Message, error) {
return nil, nil
func (q *Queue) ReceiveOne(ctx context.Context) (*MessageWithContext, error) {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.ReceiveOne")
defer span.Finish()
err := q.ensureReceiver(ctx)
if err != nil {
return nil, err
}
return q.receiver.ReceiveOne(ctx)
}
// Receive subscribes for messages sent to the Queue

Просмотреть файл

@ -31,9 +31,9 @@ import (
"testing"
"time"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2015-08-01/servicebus"
"github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2015-08-01/servicebus"
"github.com/Azure/azure-service-bus-go/internal/test"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)
@ -343,12 +343,14 @@ func buildQueue(ctx context.Context, t *testing.T, qm *QueueManager, name string
return q
}
func (suite *serviceBusSuite) TestQueue() {
func (suite *serviceBusSuite) TestQueueClient() {
tests := map[string]func(context.Context, *testing.T, *Queue){
"SimpleSend": testQueueSend,
"SendAndReceiveInOrder": testQueueSendAndReceiveInOrder,
"DuplicateDetection": testDuplicateDetection,
"MessageProperties": testQueueMessageProperties,
"MessageProperties": testMessageProperties,
"Retry": testRequeueOnFail,
"ReceiveOne": testReceiveOne,
}
ns := suite.getNewSasInstance()
@ -357,11 +359,11 @@ func (suite *serviceBusSuite) TestQueue() {
queueName := suite.randEntityName()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
window := 3 * time.Minute
dupWindow := 30 * time.Second
cleanup := makeQueue(ctx, t, ns, queueName,
QueueEntityWithPartitioning(),
QueueEntityWithDuplicateDetection(&window),
QueueEntityWithLockDuration(&window))
QueueEntityWithDuplicateDetection(&dupWindow),
QueueEntityWithMaxDeliveryCount(10))
q, err := ns.NewQueue(ctx, queueName)
suite.NoError(err)
defer func() {
@ -378,7 +380,48 @@ func (suite *serviceBusSuite) TestQueue() {
}
}
func testQueueMessageProperties(ctx context.Context, t *testing.T, q *Queue) {
func testReceiveOne(ctx context.Context, t *testing.T, q *Queue) {
if assert.NoError(t, q.Send(ctx, NewMessageFromString("Hello World!"))) {
messageWithContext, err := q.ReceiveOne(ctx)
if assert.NoError(t, err) {
span, _ := opentracing.StartSpanFromContext(messageWithContext.Ctx, "continue-message-span")
defer span.Finish()
messageWithContext.Complete()
}
}
}
func testRequeueOnFail(ctx context.Context, t *testing.T, q *Queue) {
if assert.NoError(t, q.Send(ctx, NewMessageFromString("Hello World!"))) {
var wg sync.WaitGroup
wg.Add(2)
var receivedMsg *Message
fail := true
listenHandle, err := q.Receive(context.Background(),
func(ctx context.Context, msg *Message) DispositionAction {
receivedMsg = msg
defer func() {
wg.Done()
}()
if fail {
fail = false
return msg.Abandon()
}
return msg.Complete()
})
if assert.NoError(t, err) {
defer listenHandle.Close(ctx)
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
if assert.NoError(t, err) {
assert.EqualValues(t, 2, receivedMsg.DeliveryCount)
}
}
}
}
func testMessageProperties(ctx context.Context, t *testing.T, q *Queue) {
if assert.NoError(t, q.Send(ctx, NewMessageFromString("Hello World!"))) {
var wg sync.WaitGroup
wg.Add(1)
@ -397,12 +440,13 @@ func testQueueMessageProperties(ctx context.Context, t *testing.T, q *Queue) {
waitUntil(t, &wg, time.Until(end))
if assert.NoError(t, err) {
t.Log(receivedMsg.TTL, receivedMsg.DeliveryCount, receivedMsg.ID)
assert.NotNil(t, receivedMsg.ID)
assert.NotNil(t, receivedMsg.TTL)
assert.EqualValues(t, 0, receivedMsg.DeliveryCount)
assert.NotNil(t, receivedMsg.GroupSequence)
assert.NotNil(t, receivedMsg.GroupID)
sp := receivedMsg.SystemProperties
assert.NotNil(t, sp.LockedUntil, "LockedUntil")
assert.NotNil(t, sp.EnqueuedSequenceNumber, "EnqueuedSequenceNumber")
assert.NotNil(t, sp.EnqueuedTime, "EnqueuedTime")
assert.NotNil(t, sp.SequenceNumber, "SequenceNumber")
assert.NotNil(t, sp.PartitionID, "PartitionID")
assert.NotNil(t, sp.PartitionKey, "PartitionKey")
}
}
}
@ -442,46 +486,46 @@ func testQueueSendAndReceiveInOrder(ctx context.Context, t *testing.T, queue *Qu
}
func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
dupID := mustUUID(t)
messages := []struct {
ID string
Data string
}{
{
ID: dupID.String(),
Data: "hello 1!",
},
{
ID: dupID.String(),
Data: "hello 1!",
},
{
ID: mustUUID(t).String(),
Data: "hello 2!",
},
messages := []*Message{
NewMessageFromString("hello, "),
NewMessageFromString("world!"),
}
messages[0].ID = "foo"
messages[1].ID = "bar"
for _, msg := range messages {
m := NewMessageFromString(msg.Data)
m.ID = msg.ID
err := queue.Send(ctx, m)
if err != nil {
t.Fatal(err)
if !assert.NoError(t, queue.Send(ctx, msg)) {
t.FailNow()
}
}
// send dup
if !assert.NoError(t, queue.Send(ctx, messages[0])) {
t.FailNow()
}
var wg sync.WaitGroup
wg.Add(2)
received := make(map[interface{}]string)
queue.Receive(ctx, func(ctx context.Context, event *Message) DispositionAction {
// we should get 2 messages discarding the duplicate ID
received[event.ID] = string(event.Data)
wg.Done()
return event.Complete()
var all []*Message
queue.Receive(ctx, func(ctx context.Context, message *Message) DispositionAction {
all = append(all, message)
if _, ok := received[message.ID]; !ok {
// caught a new one
defer wg.Done()
received[message.ID] = string(message.Data)
} else {
// caught a dup
assert.Fail(t, "received a duplicate message")
for _, item := range all {
t.Logf("mID: %q, gID: %q, gSeq: %q, lockT: %q", item.ID, *item.GroupID, *item.GroupSequence, *item.LockToken)
}
}
return message.Complete()
})
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
assert.Equal(t, 2, len(received), "should not have more than 2 messages", received)
}
func (suite *serviceBusSuite) TestQueueWithReceiveAndDelete() {
@ -537,87 +581,87 @@ func testQueueSendAndReceiveWithReceiveAndDelete(ctx context.Context, t *testing
waitUntil(t, &wg, time.Until(end))
}
func (suite *serviceBusSuite) TestQueueWithRequiredSessions() {
tests := map[string]func(context.Context, *testing.T, *Queue){
"TestSendAndReceiveSession": testQueueWithRequiredSessionSendAndReceive,
}
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
ns := suite.getNewSasInstance()
queueName := suite.randEntityName()
sessionID := mustUUID(t).String()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
cleanup := makeQueue(ctx, t, ns, queueName,
QueueEntityWithPartitioning(),
QueueEntityWithRequiredSessions())
q, err := ns.NewQueue(ctx, queueName, QueueWithRequiredSession(sessionID))
if suite.NoError(err) {
testFunc(ctx, t, q)
if !t.Failed() {
checkZeroQueueMessages(ctx, t, ns, queueName)
}
}
defer func() {
if q != nil {
q.Close(ctx)
}
cancel()
cleanup()
}()
}
suite.T().Run(name, setupTestTeardown)
}
}
func testQueueWithRequiredSessionSendAndReceive(ctx context.Context, t *testing.T, queue *Queue) {
numMessages := rand.Intn(100) + 20
messages := make([]string, numMessages)
for i := 0; i < numMessages; i++ {
messages[i] = test.RandomString("hello", 10)
}
for _, message := range messages {
err := queue.Send(ctx, NewMessageFromString(message))
if err != nil {
t.Fatal(err)
}
}
var wg sync.WaitGroup
wg.Add(numMessages)
// ensure in-order processing of messages from the queue
count := 0
handler := func(ctx context.Context, event *Message) DispositionAction {
if !assert.Equal(t, messages[count], string(event.Data)) {
assert.FailNow(t, fmt.Sprintf("message %d %q didn't match %q", count, messages[count], string(event.Data)))
}
count++
wg.Done()
return event.Complete()
}
listenHandle, err := queue.Receive(ctx, handler)
if err != nil {
t.Fatal(err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
listenHandle.Close(ctx)
}()
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
}
func mustUUID(t *testing.T) uuid.UUID {
id, err := uuid.NewV4()
if err != nil {
t.Fatal(err)
}
return id
}
//func (suite *serviceBusSuite) TestQueueWithRequiredSessions() {
// tests := map[string]func(context.Context, *testing.T, *Queue){
// "TestSendAndReceiveSession": testQueueWithRequiredSessionSendAndReceive,
// }
//
// for name, testFunc := range tests {
// setupTestTeardown := func(t *testing.T) {
// ns := suite.getNewSasInstance()
// queueName := suite.randEntityName()
// sessionID := mustUUID(t).String()
// ctx, cancel := context.WithTimeout(context.Background(), timeout)
// cleanup := makeQueue(ctx, t, ns, queueName,
// QueueEntityWithPartitioning(),
// QueueEntityWithRequiredSessions())
// q, err := ns.NewQueue(ctx, queueName, QueueWithRequiredSession(sessionID))
// if suite.NoError(err) {
// testFunc(ctx, t, q)
// if !t.Failed() {
// checkZeroQueueMessages(ctx, t, ns, queueName)
// }
// }
// defer func() {
// if q != nil {
// q.Close(ctx)
// }
// cancel()
// cleanup()
// }()
// }
//
// suite.T().Run(name, setupTestTeardown)
// }
//}
//
//func testQueueWithRequiredSessionSendAndReceive(ctx context.Context, t *testing.T, queue *Queue) {
// numMessages := rand.Intn(100) + 20
// messages := make([]string, numMessages)
// for i := 0; i < numMessages; i++ {
// messages[i] = test.RandomString("hello", 10)
// }
//
// for _, message := range messages {
// err := queue.Send(ctx, NewMessageFromString(message))
// if err != nil {
// t.Fatal(err)
// }
// }
//
// var wg sync.WaitGroup
// wg.Add(numMessages)
// // ensure in-order processing of messages from the queue
// count := 0
// handler := func(ctx context.Context, event *Message) DispositionAction {
// if !assert.Equal(t, messages[count], string(event.Data)) {
// assert.FailNow(t, fmt.Sprintf("message %d %q didn't match %q", count, messages[count], string(event.Data)))
// }
// count++
// wg.Done()
// return event.Complete()
// }
// listenHandle, err := queue.Receive(ctx, handler)
// if err != nil {
// t.Fatal(err)
// }
// defer func() {
// ctx, cancel := context.WithTimeout(context.Background(), timeout)
// defer cancel()
// listenHandle.Close(ctx)
// }()
//
// end, _ := ctx.Deadline()
// waitUntil(t, &wg, time.Until(end))
//}
//
//func mustUUID(t *testing.T) uuid.UUID {
// id, err := uuid.NewV4()
// if err != nil {
// t.Fatal(err)
// }
// return id
//}
func makeQueue(ctx context.Context, t *testing.T, ns *Namespace, name string, opts ...QueueManagementOption) func() {
qm := ns.NewQueueManager()

Просмотреть файл

@ -97,6 +97,43 @@ func (r *receiver) Recover(ctx context.Context) error {
return r.newSessionAndLink(ctx)
}
func (r *receiver) ReceiveOne(ctx context.Context) (*MessageWithContext, error) {
span, ctx := r.startConsumerSpanFromContext(ctx, "sb.receiver.ReceiveOne")
defer span.Finish()
amqpMsg, err := r.listenForMessage(ctx)
if err != nil {
log.For(ctx).Error(err)
return nil, err
}
msg, err := messageFromAMQPMessage(amqpMsg)
if err != nil {
log.For(ctx).Error(err)
return nil, err
}
return r.messageToMessageWithContext(ctx, msg), nil
}
func (r *receiver) messageToMessageWithContext(ctx context.Context, msg *Message) (*MessageWithContext) {
const optName = "sb.receiver.amqpEventToMessageWithContext"
var span opentracing.Span
wireContext, err := extractWireContext(msg)
if err == nil {
span, ctx = r.startConsumerSpanFromWire(ctx, optName, wireContext)
} else {
span, ctx = r.startConsumerSpanFromContext(ctx, optName)
}
defer span.Finish()
span.SetTag("amqp.message-id", msg.ID)
return &MessageWithContext{
Message: msg,
Ctx: ctx,
}
}
// Listen start a listener for messages sent to the entity path
func (r *receiver) Listen(handler Handler) *ListenerHandle {
ctx, done := context.WithCancel(context.Background())
@ -129,25 +166,30 @@ func (r *receiver) handleMessages(ctx context.Context, messages chan *amqp.Messa
}
func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) {
event := messageFromAMQPMessage(msg)
const optName = "sb.receiver.handleMessage"
event, err := messageFromAMQPMessage(msg)
if err != nil {
_, ctx := r.startConsumerSpanFromContext(ctx, optName)
log.For(ctx).Error(err)
}
var span opentracing.Span
wireContext, err := extractWireContext(event)
if err == nil {
span, ctx = r.startConsumerSpanFromWire(ctx, "sb.receiver.handleMessage", wireContext)
span, ctx = r.startConsumerSpanFromWire(ctx, optName, wireContext)
} else {
span, ctx = r.startConsumerSpanFromContext(ctx, "sb.receiver.handleMessage")
span, ctx = r.startConsumerSpanFromContext(ctx, optName)
}
defer span.Finish()
id := messageID(msg)
span.SetTag("amqp.message-id", id)
dispositionAction := handler(ctx, event)
if r.mode == ReceiveAndDeleteMode {
// tell broker the message is completed since I've received it
event.Complete()(ctx)
return
}
dispositionAction := handler(ctx, event)
if dispositionAction != nil {
dispositionAction(ctx)
} else {
@ -243,9 +285,17 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
return err
}
receiveMode := amqp.ModeSecond
sendMode := amqp.ModeUnsettled
if r.mode == ReceiveAndDeleteMode {
receiveMode = amqp.ModeFirst
sendMode = amqp.ModeSettled
}
opts := []amqp.LinkOption{
amqp.LinkSourceAddress(r.entityPath),
amqp.LinkReceiverSettle(amqp.ModeSecond),
amqp.LinkSenderSettle(sendMode),
amqp.LinkReceiverSettle(receiveMode),
amqp.LinkCredit(r.prefetch),
}

Просмотреть файл

@ -24,7 +24,7 @@ package servicebus
import (
"context"
"time"
"time"
"github.com/Azure/azure-amqp-common-go"
"github.com/Azure/azure-amqp-common-go/log"
@ -219,7 +219,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
amqpSender, err := amqpSession.NewSender(
amqp.LinkTargetAddress(s.getAddress()),
amqp.LinkSenderSettle(amqp.ModeUnsettled))
amqp.LinkSenderSettle(amqp.ModeMixed))
if err != nil {
log.For(ctx).Error(err)
return err
@ -245,4 +245,4 @@ func sendWithSession(sessionID string) senderOption {
event.sessionID = &sessionID
return nil
}
}
}