batch iterator rather than event batch

This commit is contained in:
David Justice 2019-05-30 10:56:46 -07:00
Родитель 40482cf6ad
Коммит e7ca69248d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
8 изменённых файлов: 243 добавлений и 71 удалений

174
batch.go Normal file
Просмотреть файл

@ -0,0 +1,174 @@
package eventhub
import (
"github.com/Azure/azure-amqp-common-go/uuid"
"pack.ag/amqp"
)
type (
// BatchOptions are optional information to add to a batch of messages
BatchOptions struct {
MaxSize MaxMessageSizeInBytes
}
// BatchIterator offers a simple mechanism for batching a list of events
BatchIterator interface {
Done() bool
Next(messageID string, opts *BatchOptions) (*EventBatch, error)
}
// EventBatchIterator provides an easy way to iterate over a slice of events to reliably create batches
EventBatchIterator struct {
Events []*Event
Cursor int
}
// EventBatch is a batch of Event Hubs messages to be sent
EventBatch struct {
*Event
marshaledMessages [][]byte
MaxSize MaxMessageSizeInBytes
size int
}
// BatchOption provides a way to configure `BatchOptions`
BatchOption func(opt *BatchOptions) error
// MaxMessageSizeInBytes is the max number of bytes allowed by Azure Service Bus
MaxMessageSizeInBytes uint
)
const (
// DefaultMaxMessageSizeInBytes is the maximum number of bytes in an event (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas)
DefaultMaxMessageSizeInBytes MaxMessageSizeInBytes = 1000000
batchMessageWrapperSize = 100
)
// BatchWithMaxSizeInBytes configures the EventBatchIterator to fill the batch to the specified max size in bytes
func BatchWithMaxSizeInBytes(sizeInBytes int) BatchOption {
return func(batchOption *BatchOptions) error {
batchOption.MaxSize = MaxMessageSizeInBytes(sizeInBytes)
return nil
}
}
// NewEventBatchIterator wraps a slice of `Event` pointers to allow it to be made into a `EventBatchIterator`.
func NewEventBatchIterator(events ...*Event) *EventBatchIterator {
return &EventBatchIterator{
Events: events,
}
}
// Done communicates whether there are more messages remaining to be iterated over.
func (ebi *EventBatchIterator) Done() bool {
return len(ebi.Events) == ebi.Cursor
}
// Next fetches the batch of messages in the message slice at a position one larger than the last one accessed.
func (ebi *EventBatchIterator) Next(eventID string, opts *BatchOptions) (*EventBatch, error) {
if ebi.Done() {
return nil, ErrNoMessages{}
}
if opts == nil {
opts = &BatchOptions{
MaxSize: DefaultMaxMessageSizeInBytes,
}
}
eb := NewEventBatch(eventID, opts)
for ebi.Cursor < len(ebi.Events) {
ok, err := eb.Add(ebi.Events[ebi.Cursor])
if err != nil {
return nil, err
}
if !ok {
return eb, nil
}
ebi.Cursor++
}
return eb, nil
}
// NewEventBatch builds a new event batch
func NewEventBatch(eventID string, opts *BatchOptions) *EventBatch {
if opts == nil {
opts = &BatchOptions{
MaxSize: DefaultMaxMessageSizeInBytes,
}
}
mb := &EventBatch{
MaxSize: opts.MaxSize,
Event: &Event{
ID: eventID,
},
}
return mb
}
// Add adds a message to the batch if the message will not exceed the max size of the batch
func (eb *EventBatch) Add(e *Event) (bool, error) {
e.PartitionKey = eb.PartitionKey
msg, err := e.toMsg()
if err != nil {
return false, err
}
if msg.Properties.MessageID == nil || msg.Properties.MessageID == "" {
uid, err := uuid.NewV4()
if err != nil {
return false, err
}
msg.Properties.MessageID = uid.String()
}
bin, err := msg.MarshalBinary()
if err != nil {
return false, err
}
if eb.Size()+len(bin) > int(eb.MaxSize) {
return false, nil
}
eb.size += len(bin)
eb.marshaledMessages = append(eb.marshaledMessages, bin)
return true, nil
}
// Clear will zero out the batch size and clear the buffered messages
func (eb *EventBatch) Clear() {
eb.marshaledMessages = [][]byte{}
eb.size = 0
}
// Size is the number of bytes in the message batch
func (eb *EventBatch) Size() int {
// calculated data size + batch message wrapper + data wrapper portions of the message
return eb.size + batchMessageWrapperSize + (len(eb.marshaledMessages) * 5)
}
func (eb *EventBatch) toMsg() (*amqp.Message, error) {
batchMessage := eb.amqpBatchMessage()
batchMessage.Data = make([][]byte, len(eb.marshaledMessages))
for idx, bytes := range eb.marshaledMessages {
batchMessage.Data[idx] = bytes
}
return batchMessage, nil
}
func (eb *EventBatch) amqpBatchMessage() *amqp.Message {
return &amqp.Message{
Data: make([][]byte, len(eb.marshaledMessages)),
Format: batchMessageFormat,
Properties: &amqp.MessageProperties{
MessageID: eb.ID,
},
}
}

Просмотреть файл

@ -2,7 +2,10 @@
## `v2.0.0`
- **breaking change** moved github.com/Azure/azure-amqp-common-go/persist to github.com/Azure/azure-event-hubs-go/persist
- **breaking change:** moved github.com/Azure/azure-amqp-common-go/persist to
github.com/Azure/azure-event-hubs-go/persist
- **breaking change:** changed batch message sending to use a safe batch iterator rather than leaving batch sizing to
the consumer.
- move tracing to devigned/tab so to not have to take a direct dependency on opentracing or opencensus
## `v1.3.1`

Просмотреть файл

@ -33,9 +33,10 @@ import (
"github.com/Azure/azure-amqp-common-go/aad"
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/stretchr/testify/suite"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/stretchr/testify/suite"
)
const (
@ -103,7 +104,7 @@ func (s *testSuite) TestSingle() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error {
_, _ = processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error {
wg.Done()
return nil
})
@ -111,7 +112,7 @@ func (s *testSuite) TestSingle() {
s.NoError(processor.StartNonBlocking(context.Background()))
defer func() {
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processor.Close(closeContext)
_ = processor.Close(closeContext)
cancel()
del()
}()
@ -136,14 +137,14 @@ func (s *testSuite) TestMultiple() {
s.Require().NoError(err)
processors[processor.GetName()] = processor
processor.StartNonBlocking(ctx)
s.Require().NoError(processor.StartNonBlocking(ctx))
processorNames[i] = processor.GetName()
}
defer func() {
for _, processor := range processors {
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processor.Close(closeContext)
_ = processor.Close(closeContext)
cancel()
}
del()
@ -175,7 +176,7 @@ func (s *testSuite) TestMultiple() {
s.Require().True(balanced, "never balanced work within allotted time")
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processors[processorNames[numPartitions-1]].Close(closeContext) // close the last partition
s.Require().NoError(processors[processorNames[numPartitions-1]].Close(closeContext)) // close the last partition
delete(processors, processorNames[numPartitions-1])
cancel()
@ -211,7 +212,7 @@ func (s *testSuite) sendMessages(hubName string, length int) ([]string, error) {
client := s.newClient(s.T(), hubName)
defer func() {
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
client.Close(closeContext)
_ = client.Close(closeContext)
cancel()
}()
@ -227,7 +228,11 @@ func (s *testSuite) sendMessages(hubName string, length int) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client.SendBatch(ctx, eventhub.NewEventBatch(events))
ebi := eventhub.NewEventBatchIterator(events...)
err := client.SendBatch(ctx, ebi)
if err != nil {
return nil, err
}
return messages, ctx.Err()
}

11
errors.go Normal file
Просмотреть файл

@ -0,0 +1,11 @@
package eventhub
type (
// ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be
// more messages in the future.
ErrNoMessages struct{}
)
func (e ErrNoMessages) Error() string {
return "no messages available"
}

Просмотреть файл

@ -52,14 +52,6 @@ type (
SystemProperties *SystemProperties
}
// EventBatch is a batch of Event Hubs messages to be sent
EventBatch struct {
Events []*Event
PartitionKey *string
Properties map[string]interface{}
ID string
}
// SystemProperties are used to store properties that are set by the system.
SystemProperties struct {
SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"` // unique sequence number of the message
@ -87,13 +79,6 @@ func NewEvent(data []byte) *Event {
}
}
// NewEventBatch builds an EventBatch from an array of Events
func NewEventBatch(events []*Event) *EventBatch {
return &EventBatch{
Events: events,
}
}
// GetCheckpoint returns the checkpoint information on the Event
func (e *Event) GetCheckpoint() persist.Checkpoint {
var offset string
@ -172,36 +157,6 @@ func (e *Event) toMsg() (*amqp.Message, error) {
return msg, nil
}
func (b *EventBatch) toEvent() (*Event, error) {
msg := &amqp.Message{
Data: make([][]byte, len(b.Events)),
Properties: &amqp.MessageProperties{
MessageID: b.ID,
},
Format: batchMessageFormat,
}
if b.PartitionKey != nil {
msg.Annotations = make(amqp.Annotations)
msg.Annotations[partitionKeyAnnotationName] = b.PartitionKey
}
for idx, event := range b.Events {
innerMsg, err := event.toMsg()
if err != nil {
return nil, err
}
bin, err := innerMsg.MarshalBinary()
if err != nil {
return nil, err
}
msg.Data[idx] = bin
}
return eventFromMsg(msg)
}
func eventFromMsg(msg *amqp.Message) (*Event, error) {
return newEvent(msg.Data[0], msg)
}

42
hub.go
Просмотреть файл

@ -37,6 +37,7 @@ import (
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-amqp-common-go/conn"
"github.com/Azure/azure-amqp-common-go/sas"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/date"
@ -635,24 +636,49 @@ func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error
return sender.Send(ctx, event, opts...)
}
// SendBatch sends an EventBatch to the Event Hub
//
// SendBatch will retry sending the message for as long as the context allows
func (h *Hub) SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error {
// SendBatch sends a batch of events to the Hub
func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.SendBatch")
defer span.End()
sender, err := h.getSender(ctx)
if err != nil {
tab.For(ctx).Error(err)
return err
}
event, err := batch.toEvent()
if err != nil {
return err
batchOptions := &BatchOptions{
MaxSize: DefaultMaxMessageSizeInBytes,
}
return sender.Send(ctx, event, opts...)
for _, opt := range opts {
if err := opt(batchOptions); err != nil {
tab.For(ctx).Error(err)
return err
}
}
for !iterator.Done() {
id, err := uuid.NewV4()
if err != nil {
tab.For(ctx).Error(err)
return err
}
batch, err := iterator.Next(id.String(), batchOptions)
if err != nil {
tab.For(ctx).Error(err)
return err
}
if err := sender.trySend(ctx, batch); err != nil {
tab.For(ctx).Error(err)
return err
}
}
return nil
}
// HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition

Просмотреть файл

@ -326,11 +326,9 @@ func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
for idx, msg := range messages {
events[idx] = NewEventFromString(msg)
}
batch := &EventBatch{
Events: events,
}
if assert.NoError(t, client.SendBatch(ctx, batch)) {
ebi := NewEventBatchIterator(events...)
if assert.NoError(t, client.SendBatch(ctx, ebi)) {
count := 0
_, err := client.Receive(context.Background(), partitionID, func(ctx context.Context, event *Event) error {
assert.Equal(t, messages[count], string(event.Data))
@ -354,11 +352,9 @@ func testBatchSendTooLarge(ctx context.Context, t *testing.T, client *Hub, _ str
for idx := range events {
events[idx] = NewEventFromString(test.RandomString("foo", 10))
}
batch := &EventBatch{
Events: events,
}
assert.EqualError(t, client.SendBatch(ctx, batch), "encoded message size exceeds max of 1046528")
ebi := NewEventBatchIterator(events...)
assert.EqualError(t, client.SendBatch(ctx, ebi, BatchWithMaxSizeInBytes(10000000)), "encoded message size exceeds max of 1046528")
}
func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionID string) {

Просмотреть файл

@ -223,7 +223,9 @@ func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ts.NoError(client.SendBatch(ctx, eventhub.NewEventBatch(events)))
ebi := eventhub.NewEventBatchIterator(events...)
ts.NoError(client.SendBatch(ctx, ebi))
return messages, ctx.Err()
}