Support sending and receiving custom annotations in System Properties (#174)

* Support sending and receiving custom annotations in System Properties

* fix terraform setup and make tests pass

* remove extra annotation assignment

* don't allow partition key to overwrite annotations

* update version number

Co-authored-by: Joel Hendrix <jhendrix@microsoft.com>
This commit is contained in:
Jeff Principe 2020-06-08 12:35:13 -07:00 коммит произвёл GitHub
Родитель ecad5a51c1
Коммит 9898ba6578
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 78 добавлений и 19 удалений

Просмотреть файл

@ -28,9 +28,9 @@ import (
"time"
"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
"github.com/mitchellh/mapstructure"
"github.com/Azure/go-amqp"
)
const (

Просмотреть файл

@ -78,9 +78,9 @@ resource "random_string" "secret" {
resource "azuread_application" "test" {
count = data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0
name = "eventhubstest"
homepage = "https://eventhubstest"
identifier_uris = ["https://eventhubstest"]
reply_urls = ["https://eventhubstest"]
homepage = "https://eventhubstest-${random_string.name.result}"
identifier_uris = ["https://eventhubstest-${random_string.name.result}"]
reply_urls = ["https://eventhubstest-${random_string.name.result}"]
available_to_other_tenants = false
oauth2_allow_implicit_flow = true
}

Просмотреть файл

@ -44,7 +44,7 @@ const (
batchMessageWrapperSize = 100
// KeyOfNoPartitionKey is the key value in Events map for Events which do not have PartitionKey
KeyOfNoPartitionKey = "NoPartitionKey"
KeyOfNoPartitionKey = "NoPartitionKey"
)
// BatchWithMaxSizeInBytes configures the EventBatchIterator to fill the batch to the specified max size in bytes
@ -65,7 +65,7 @@ func NewEventBatchIterator(events ...*Event) *EventBatchIterator {
if event.PartitionKey == nil {
key = KeyOfNoPartitionKey
} else {
key = * event.PartitionKey
key = *event.PartitionKey
}
if _, ok = partitionEventMap[key]; !ok {
cursors[key] = 0

Просмотреть файл

@ -1,5 +1,8 @@
# Change Log
## `v3.3.0`
- add support for sending and receiving custom annotations
## `v3.2.0`
- add IoT Hub system properties

Просмотреть файл

@ -69,6 +69,8 @@ type (
IoTHubConnectionModuleID *string `mapstructure:"iothub-connection-module-id"`
// Nil for messages other than from Azure IoT Hub. The time the Device-to-Cloud message was received by IoT Hub.
IoTHubEnqueuedTime *time.Time `mapstructure:"iothub-enqueuedtime"`
// Raw annotations provided on the message. Includes any additional System Properties that are not explicitly mapped.
Annotations map[string]interface{} `mapstructure:"-"`
}
mapStructureTag struct {
@ -152,15 +154,22 @@ func (e *Event) toMsg() (*amqp.Message, error) {
}
if e.SystemProperties != nil {
// Set the raw annotations first (they may be nil) and add the explicit
// system properties second to ensure they're set properly.
msg.Annotations = addMapToAnnotations(msg.Annotations, e.SystemProperties.Annotations)
sysPropMap, err := encodeStructureToMap(e.SystemProperties)
if err != nil {
return nil, err
}
msg.Annotations = annotationsFromMap(sysPropMap)
msg.Annotations = addMapToAnnotations(msg.Annotations, sysPropMap)
}
if e.PartitionKey != nil {
msg.Annotations = make(amqp.Annotations)
if msg.Annotations == nil {
msg.Annotations = make(amqp.Annotations)
}
msg.Annotations[partitionKeyAnnotationName] = e.PartitionKey
}
@ -189,13 +198,33 @@ func newEvent(data []byte, msg *amqp.Message) (*Event, error) {
event.PartitionKey = &valStr
}
}
}
if msg.Annotations != nil {
if err := mapstructure.WeakDecode(msg.Annotations, &event.SystemProperties); err != nil {
fmt.Println("error decoding...", err)
return event, err
}
// If we didn't populate any system properties, set up the struct so we
// can put the annotations in it
if event.SystemProperties == nil {
event.SystemProperties = new(SystemProperties)
}
// Take all string-keyed annotations because the protocol reserves all
// numeric keys for itself and there are no numeric keys defined in the
// protocol today:
//
// http://www.amqp.org/sites/amqp.org/files/amqp.pdf (section 3.2.10)
//
// This approach is also consistent with the behavior of .NET:
//
// https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventdata.systemproperties?view=azure-dotnet#Azure_Messaging_EventHubs_EventData_SystemProperties
event.SystemProperties.Annotations = make(map[string]interface{})
for key, val := range msg.Annotations {
if s, ok := key.(string); ok {
event.SystemProperties.Annotations[s] = val
}
}
}
if msg != nil {
@ -222,6 +251,11 @@ func encodeStructureToMap(structPointer interface{}) (map[string]interface{}, er
return nil, err
}
// Skip any entries with an exclude tag
if tag.Name == "-" {
continue
}
if tag != nil {
switch f.Kind() {
case reflect.Ptr:
@ -267,8 +301,10 @@ func parseMapStructureTag(tag reflect.StructTag) (*mapStructureTag, error) {
return mapTag, nil
}
func annotationsFromMap(m map[string]interface{}) amqp.Annotations {
a := make(amqp.Annotations)
func addMapToAnnotations(a amqp.Annotations, m map[string]interface{}) amqp.Annotations {
if a == nil && len(m) > 0 {
a = make(amqp.Annotations)
}
for key, val := range m {
a[key] = val
}

3
go.mod
Просмотреть файл

@ -19,6 +19,7 @@ require (
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7
github.com/mitchellh/mapstructure v1.1.2
github.com/sirupsen/logrus v1.2.0
github.com/stretchr/testify v1.3.0
github.com/stretchr/testify v1.5.1
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
gopkg.in/yaml.v2 v2.2.8 // indirect
)

7
go.sum
Просмотреть файл

@ -73,6 +73,8 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g=
@ -87,5 +89,10 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

2
hub.go
Просмотреть файл

@ -39,11 +39,11 @@ import (
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/date"
"github.com/Azure/go-autorest/autorest/to"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/Azure/azure-event-hubs-go/v3/atom"
"github.com/Azure/azure-event-hubs-go/v3/persist"

Просмотреть файл

@ -31,6 +31,7 @@ import (
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"sync"
"testing"
@ -368,7 +369,13 @@ func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
}
for idx, message := range messages {
if !assert.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))) {
event := NewEventFromString(message)
event.SystemProperties = &SystemProperties{
Annotations: map[string]interface{}{
"x-opt-custom-annotation": "custom-value",
},
}
if !assert.NoError(t, client.Send(ctx, event, SendWithMessageID(fmt.Sprintf("%d", idx)))) {
assert.FailNow(t, "unable to send event")
}
}
@ -381,6 +388,11 @@ func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
assert.NotNil(t, event.SystemProperties.Offset)
assert.NotNil(t, event.SystemProperties.SequenceNumber)
assert.Equal(t, int64(count), *event.SystemProperties.SequenceNumber)
require.NotNil(t, event.SystemProperties.Annotations)
assert.Equal(t, *event.SystemProperties.EnqueuedTime, event.SystemProperties.Annotations["x-opt-enqueued-time"].(time.Time))
assert.Equal(t, strconv.FormatInt(*event.SystemProperties.Offset, 10), event.SystemProperties.Annotations["x-opt-offset"].(string))
assert.Equal(t, *event.SystemProperties.SequenceNumber, event.SystemProperties.Annotations["x-opt-sequence-number"].(int64))
assert.Equal(t, "custom-value", event.SystemProperties.Annotations["x-opt-custom-annotation"].(string))
count++
wg.Done()
return nil

Просмотреть файл

@ -31,9 +31,9 @@ import (
"github.com/Azure/azure-amqp-common-go/v3/cbs"
"github.com/Azure/azure-amqp-common-go/v3/conn"
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/azure"
"golang.org/x/net/websocket"
"github.com/Azure/go-amqp"
)
type (

Просмотреть файл

@ -28,8 +28,8 @@ import (
"time"
common "github.com/Azure/azure-amqp-common-go/v3"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
"github.com/Azure/azure-event-hubs-go/v3/persist"
)

Просмотреть файл

@ -29,9 +29,9 @@ import (
"time"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
"github.com/jpillora/backoff"
"github.com/Azure/go-amqp"
)
// sender provides session and link handling for an sending entity path

Просмотреть файл

@ -2,5 +2,5 @@ package eventhub
const (
// Version is the semantic version number
Version = "3.2.0"
Version = "3.3.0"
)