add system properties to Event

This commit is contained in:
David Justice 2019-05-06 11:09:38 -07:00
Родитель 8f96175bf0
Коммит bd38add7f7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
6 изменённых файлов: 144 добавлений и 19 удалений

134
event.go
Просмотреть файл

@ -23,9 +23,13 @@ package eventhub
// SOFTWARE
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/Azure/azure-amqp-common-go/persist"
"github.com/mitchellh/mapstructure"
"pack.ag/amqp"
)
@ -39,11 +43,12 @@ const (
type (
// Event is an Event Hubs message to be sent or received
Event struct {
Data []byte
PartitionKey *string
Properties map[string]interface{}
ID string
message *amqp.Message
Data []byte
PartitionKey *string
Properties map[string]interface{}
ID string
message *amqp.Message
SystemProperties *SystemProperties
}
// EventBatch is a batch of Event Hubs messages to be sent
@ -53,6 +58,20 @@ type (
Properties map[string]interface{}
ID string
}
// SystemProperties are used to store properties that are set by the system.
SystemProperties struct {
SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"` // unique sequence number of the message
EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"` // time the message landed in the message queue
Offset *int64 `mapstructure:"x-opt-offset"`
PartitionID *int16 `mapstructure:"x-opt-partition-id"`
PartitionKey *string `mapstructure:"x-opt-partition-key"`
}
mapStructureTag struct {
Name string
PersistEmpty bool
}
)
// NewEventFromString builds an Event from a string message
@ -114,7 +133,7 @@ func (e *Event) Get(key string) (interface{}, bool) {
return nil, false
}
func (e *Event) toMsg() *amqp.Message {
func (e *Event) toMsg() (*amqp.Message, error) {
msg := e.message
if msg == nil {
msg = amqp.NewMessage(e.Data)
@ -131,11 +150,20 @@ func (e *Event) toMsg() *amqp.Message {
}
}
if e.SystemProperties != nil {
sysPropMap, err := encodeStructureToMap(e.SystemProperties)
if err != nil {
return nil, err
}
msg.Annotations = annotationsFromMap(sysPropMap)
}
if e.PartitionKey != nil {
msg.Annotations = make(amqp.Annotations)
msg.Annotations[partitionKeyAnnotationName] = e.PartitionKey
}
return msg
return msg, nil
}
func (b *EventBatch) toEvent() (*Event, error) {
@ -153,7 +181,11 @@ func (b *EventBatch) toEvent() (*Event, error) {
}
for idx, event := range b.Events {
innerMsg := event.toMsg()
innerMsg, err := event.toMsg()
if err != nil {
return nil, err
}
bin, err := innerMsg.MarshalBinary()
if err != nil {
return nil, err
@ -161,14 +193,14 @@ func (b *EventBatch) toEvent() (*Event, error) {
msg.Data[idx] = bin
}
return eventFromMsg(msg), nil
return eventFromMsg(msg)
}
func eventFromMsg(msg *amqp.Message) *Event {
func eventFromMsg(msg *amqp.Message) (*Event, error) {
return newEvent(msg.Data[0], msg)
}
func newEvent(data []byte, msg *amqp.Message) *Event {
func newEvent(data []byte, msg *amqp.Message) (*Event, error) {
event := &Event{
Data: data,
message: msg,
@ -188,8 +220,86 @@ func newEvent(data []byte, msg *amqp.Message) *Event {
}
}
if msg.Annotations != nil {
if err := mapstructure.WeakDecode(msg.Annotations, &event.SystemProperties); err != nil {
fmt.Println("error decoding...", err)
return event, err
}
}
if msg != nil {
event.Properties = msg.ApplicationProperties
}
return event
return event, nil
}
func encodeStructureToMap(structPointer interface{}) (map[string]interface{}, error) {
valueOfStruct := reflect.ValueOf(structPointer)
s := valueOfStruct.Elem()
if s.Kind() != reflect.Struct {
return nil, fmt.Errorf("must provide a struct")
}
encoded := make(map[string]interface{})
for i := 0; i < s.NumField(); i++ {
f := s.Field(i)
if f.IsValid() && f.CanSet() {
tf := s.Type().Field(i)
tag, err := parseMapStructureTag(tf.Tag)
if err != nil {
return nil, err
}
if tag != nil {
switch f.Kind() {
case reflect.Ptr:
if !f.IsNil() || tag.PersistEmpty {
if f.IsNil() {
encoded[tag.Name] = nil
} else {
encoded[tag.Name] = f.Elem().Interface()
}
}
default:
if f.Interface() != reflect.Zero(f.Type()).Interface() || tag.PersistEmpty {
encoded[tag.Name] = f.Interface()
}
}
}
}
}
return encoded, nil
}
func parseMapStructureTag(tag reflect.StructTag) (*mapStructureTag, error) {
str, ok := tag.Lookup("mapstructure")
if !ok {
return nil, nil
}
mapTag := new(mapStructureTag)
split := strings.Split(str, ",")
mapTag.Name = strings.TrimSpace(split[0])
if len(split) > 1 {
for _, tagKey := range split[1:] {
switch tagKey {
case "persistempty":
mapTag.PersistEmpty = true
default:
return nil, fmt.Errorf("key %q is not understood", tagKey)
}
}
}
return mapTag, nil
}
func annotationsFromMap(m map[string]interface{}) amqp.Annotations {
a := make(amqp.Annotations)
for key, val := range m {
a[key] = val
}
return a
}

Просмотреть файл

@ -97,7 +97,7 @@ func ExampleHub_webSocket() {
}
// Create a client to communicate with EventHub
hub, err := eventhub.NewHubFromConnectionString(connStr + ";EntityPath=" + hubEntity.Name, eventhub.HubWithWebSocketConnection())
hub, err := eventhub.NewHubFromConnectionString(connStr+";EntityPath="+hubEntity.Name, eventhub.HubWithWebSocketConnection())
if err != nil {
fmt.Println(err)
return

Просмотреть файл

@ -380,6 +380,11 @@ func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
count := 0
_, err := client.Receive(ctx, partitionID, func(ctx context.Context, event *Event) error {
assert.Equal(t, messages[count], string(event.Data))
require.NotNil(t, event.SystemProperties)
assert.NotNil(t, event.SystemProperties.EnqueuedTime)
assert.NotNil(t, event.SystemProperties.Offset)
assert.NotNil(t, event.SystemProperties.SequenceNumber)
assert.Equal(t, int64(count), *event.SystemProperties.SequenceNumber)
count++
wg.Done()
return nil

Просмотреть файл

@ -220,8 +220,8 @@ func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
break
}
}
} else {
suite.T().Logf("%q does not contain %q", *val.Name, suite.TagID)
} else if !strings.HasPrefix(*val.Name, "examplehub_") {
suite.T().Logf("%q does not contain %q, so it won't be deleted.", *val.Name, suite.TagID)
}
}
suite.NoError(res.Next())

Просмотреть файл

@ -228,7 +228,13 @@ func (r *receiver) handleMessages(ctx context.Context, messages chan *amqp.Messa
}
func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) {
event := eventFromMsg(msg)
event, err := eventFromMsg(msg)
if err != nil {
log.For(ctx).Error(err)
r.lastError = err
r.done()
}
var span *trace.Span
if val, ok := event.Get("_oc_prop"); ok {
if sc, ok := propagation.FromBinary(val.([]byte)); ok {
@ -245,7 +251,7 @@ func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
span.AddAttributes(trace.StringAttribute("eh.message_id", str))
}
err := handler(ctx, event)
err = handler(ctx, event)
if err != nil {
err = msg.Modify(true, false, nil)
if err != nil {

Просмотреть файл

@ -53,7 +53,7 @@ type (
eventer interface {
Set(key string, value interface{})
toMsg() *amqp.Message
toMsg() (*amqp.Message, error)
}
)
@ -152,7 +152,11 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
defer sp.End()
evt.Set("_oc_prop", propagation.Binary(sp.SpanContext()))
msg := evt.toMsg()
msg, err := evt.toMsg()
if err != nil {
return err
}
if str, ok := msg.Properties.MessageID.(string); ok {
sp.AddAttributes(trace.StringAttribute("he.message_id", str))
}