Support send and receiving custom annotations in System Properties (#169)

* Support send and receiving custom annotations in System Properties

* update changelog and version number

Co-authored-by: Joel Hendrix <jhendrix@microsoft.com>
This commit is contained in:
Jeff Principe 2020-06-03 12:15:05 -07:00 коммит произвёл GitHub
Родитель a123c25216
Коммит dfc63648e7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 83 добавлений и 22 удалений

Просмотреть файл

@ -63,9 +63,9 @@ resource "random_string" "secret" {
resource "azuread_application" "test" {
count = data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0
name = "servicebustest"
homepage = "https://servicebustest"
identifier_uris = ["https://servicebustest"]
reply_urls = ["https://servicebustest"]
homepage = "https://servicebustest-${random_string.name.result}"
identifier_uris = ["https://servicebustest-${random_string.name.result}"]
reply_urls = ["https://servicebustest-${random_string.name.result}"]
available_to_other_tenants = false
oauth2_allow_implicit_flow = true
}
@ -160,4 +160,4 @@ output "AZURE_CLIENT_ID" {
output "AZURE_CLIENT_SECRET" {
value = compact(concat(azuread_service_principal_password.test.*.value, list(var.azure_client_secret)))[0]
sensitive = true
}
}

Просмотреть файл

@ -1,5 +1,11 @@
# Change Log
## `v0.10.2`
- add support for sending and receiving custom annotations
- added some missing AMQP span attributes
- fixed propagation of sender/receiver close context
- don't panic on empty AMQP payloads
## `v0.10.1`
- fix nil pointer dereference for concurrent uses of Send() [issue #149](https://github.com/Azure/azure-service-bus-go/issues/149)
- fix nil pointer dereference when there are no listeners [PR #151](https://github.com/Azure/azure-service-bus-go/pull/151)
@ -59,4 +65,4 @@
- Ensure senders wait for message disposition before returning
## `v0.1.0`
- initial tag for Service Bus which includes Queues, Topics and Subscriptions using AMQP
- initial tag for Service Bus which includes Queues, Topics and Subscriptions using AMQP

Просмотреть файл

@ -68,15 +68,16 @@ type (
// SystemProperties are used to store properties that are set by the system.
SystemProperties struct {
LockedUntil *time.Time `mapstructure:"x-opt-locked-until"`
SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"`
PartitionID *int16 `mapstructure:"x-opt-partition-id"`
PartitionKey *string `mapstructure:"x-opt-partition-key"`
EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"`
DeadLetterSource *string `mapstructure:"x-opt-deadletter-source"`
ScheduledEnqueueTime *time.Time `mapstructure:"x-opt-scheduled-enqueue-time"`
EnqueuedSequenceNumber *int64 `mapstructure:"x-opt-enqueue-sequence-number"`
ViaPartitionKey *string `mapstructure:"x-opt-via-partition-key"`
LockedUntil *time.Time `mapstructure:"x-opt-locked-until"`
SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"`
PartitionID *int16 `mapstructure:"x-opt-partition-id"`
PartitionKey *string `mapstructure:"x-opt-partition-key"`
EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"`
DeadLetterSource *string `mapstructure:"x-opt-deadletter-source"`
ScheduledEnqueueTime *time.Time `mapstructure:"x-opt-scheduled-enqueue-time"`
EnqueuedSequenceNumber *int64 `mapstructure:"x-opt-enqueue-sequence-number"`
ViaPartitionKey *string `mapstructure:"x-opt-via-partition-key"`
Annotations map[string]interface{} `mapstructure:"-"`
}
mapStructureTag struct {
@ -364,11 +365,15 @@ func (m *Message) toMsg() (*amqp.Message, error) {
}
if m.SystemProperties != nil {
// Set the raw annotations first (they may be nil) and add the explicit
// system properties second to ensure they're set properly.
amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, m.SystemProperties.Annotations)
sysPropMap, err := encodeStructureToMap(m.SystemProperties)
if err != nil {
return nil, err
}
amqpMsg.Annotations = annotationsFromMap(sysPropMap)
amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, sysPropMap)
}
if m.LockToken != nil {
@ -381,8 +386,11 @@ func (m *Message) toMsg() (*amqp.Message, error) {
return amqpMsg, nil
}
func annotationsFromMap(m map[string]interface{}) amqp.Annotations {
a := make(amqp.Annotations)
func addMapToAnnotations(a amqp.Annotations, m map[string]interface{}) amqp.Annotations {
if a == nil && len(m) > 0 {
a = make(amqp.Annotations)
}
for key, val := range m {
a[key] = val
}
@ -434,6 +442,28 @@ func newMessage(data []byte, amqpMsg *amqp.Message) (*Message, error) {
if err := mapstructure.Decode(amqpMsg.Annotations, &msg.SystemProperties); err != nil {
return msg, err
}
// If we didn't populate any system properties, set up the struct so we
// can put the annotations in it
if msg.SystemProperties == nil {
msg.SystemProperties = new(SystemProperties)
}
// Take all string-keyed annotations because the protocol reserves all
// numeric keys for itself and there are no numeric keys defined in the
// protocol today:
//
// http://www.amqp.org/sites/amqp.org/files/amqp.pdf (section 3.2.10)
//
// This approach is also consistent with the behavior of .NET:
//
// https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventdata.systemproperties?view=azure-dotnet#Azure_Messaging_EventHubs_EventData_SystemProperties
msg.SystemProperties.Annotations = make(map[string]interface{})
for key, val := range amqpMsg.Annotations {
if s, ok := key.(string); ok {
msg.SystemProperties.Annotations[s] = val
}
}
}
if amqpMsg.DeliveryTag != nil && len(amqpMsg.DeliveryTag) > 0 {
@ -500,6 +530,11 @@ func encodeStructureToMap(structPointer interface{}) (map[string]interface{}, er
return nil, err
}
// Skip any entries with an exclude tag
if tag.Name == "-" {
continue
}
if tag != nil {
switch f.Kind() {
case reflect.Ptr:

Просмотреть файл

@ -8,8 +8,8 @@ import (
"time"
"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
)
// MessageSession represents and allows for interaction with a Service Bus Session.

Просмотреть файл

@ -4,9 +4,9 @@ import (
"time"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/to"
"github.com/mitchellh/mapstructure"
"github.com/Azure/go-amqp"
)
func (suite *serviceBusSuite) TestMapStructureEncode() {
@ -74,6 +74,10 @@ func (suite *serviceBusSuite) TestMessageToAMQPMessage() {
ScheduledEnqueueTime: &until,
EnqueuedSequenceNumber: to.Int64Ptr(1),
ViaPartitionKey: to.StringPtr("via"),
Annotations: map[string]interface{}{
"custom": "annotation",
"x-opt-partition-key": "other value",
},
},
UserProperties: map[string]interface{}{
"test": "foo",
@ -100,6 +104,17 @@ func (suite *serviceBusSuite) TestMessageToAMQPMessage() {
for key, val := range sysPropMap {
suite.Equal(val, aMsg.Annotations[key], key)
}
for key, val := range msg.SystemProperties.Annotations {
// The partition key should be overridden by the value in the
// base system properties
if key == "x-opt-partition-key" {
suite.Equal(*msg.SystemProperties.PartitionKey, aMsg.Annotations[key], key)
continue
}
suite.Equal(val, aMsg.Annotations[key], key)
}
}
for key, val := range msg.UserProperties {
@ -147,6 +162,7 @@ func (suite *serviceBusSuite) TestAMQPMessageToMessage() {
"x-opt-scheduled-enqueue-time": until,
"x-opt-enqueue-sequence-number": int64(1),
"x-opt-via-partition-key": "via",
"custom-annotation": "value",
},
ApplicationProperties: map[string]interface{}{
"test": "foo",
@ -177,6 +193,10 @@ func (suite *serviceBusSuite) TestAMQPMessageToMessage() {
for key, val := range sysPropMap {
suite.Equal(val, aMsg.Annotations[key], key)
}
for key, val := range msg.SystemProperties.Annotations {
suite.Equal(val, aMsg.Annotations[key], key)
}
}
for key, val := range aMsg.ApplicationProperties {

Просмотреть файл

@ -49,7 +49,7 @@ const (
//`
// Version is the semantic version number
Version = "0.10.1"
Version = "0.10.2"
rootUserAgent = "/golang-service-bus"
)

2
rpc.go
Просмотреть файл

@ -32,8 +32,8 @@ import (
"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
)
type (

Просмотреть файл

@ -28,8 +28,8 @@ import (
"sync/atomic"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
)
type (