Drop messages which 1MB bigger
This commit is contained in:
Родитель
d4f0c5777f
Коммит
99af387d78
11
batch.go
11
batch.go
|
@ -1,6 +1,8 @@
|
|||
package eventhub
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/v3/uuid"
|
||||
"github.com/Azure/go-amqp"
|
||||
)
|
||||
|
@ -47,6 +49,8 @@ const (
|
|||
KeyOfNoPartitionKey = "NoPartitionKey"
|
||||
)
|
||||
|
||||
var ErrMessageIsTooBig = errors.New("message is too big")
|
||||
|
||||
// BatchWithMaxSizeInBytes configures the EventBatchIterator to fill the batch to the specified max size in bytes
|
||||
func BatchWithMaxSizeInBytes(sizeInBytes int) BatchOption {
|
||||
return func(batchOption *BatchOptions) error {
|
||||
|
@ -106,7 +110,7 @@ func (ebi *EventBatchIterator) Next(eventID string, opts *BatchOptions) (*EventB
|
|||
}
|
||||
}
|
||||
|
||||
events := ebi.PartitionEventsMap[key]
|
||||
events := ebi.PartitionEventsMap[key][ebi.Cursors[key]:]
|
||||
eb := NewEventBatch(eventID, opts)
|
||||
if key != KeyOfNoPartitionKey && len(events) > 0 {
|
||||
eb.PartitionKey = events[0].PartitionKey
|
||||
|
@ -118,6 +122,11 @@ func (ebi *EventBatchIterator) Next(eventID string, opts *BatchOptions) (*EventB
|
|||
}
|
||||
|
||||
if !ok {
|
||||
if len(eb.marshaledMessages) == 0 {
|
||||
ebi.Cursors[key]++
|
||||
return nil, ErrMessageIsTooBig
|
||||
}
|
||||
|
||||
return eb, nil
|
||||
}
|
||||
ebi.Cursors[key]++
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
package eventhub_test
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/Azure/azure-event-hubs-go/v3"
|
||||
eventhub "github.com/Azure/azure-event-hubs-go/v3"
|
||||
)
|
||||
|
||||
func TestNewEventBatch(t *testing.T) {
|
||||
|
@ -28,6 +29,7 @@ func TestEventBatch_AddManyMessages(t *testing.T) {
|
|||
ok, err := eb.Add(event)
|
||||
assert.True(t, ok)
|
||||
assert.NoError(t, err)
|
||||
|
||||
msgSize := eb.Size() - wrapperSize
|
||||
|
||||
limit := ((int(eb.MaxSize) - 100) / msgSize) - 1
|
||||
|
@ -52,3 +54,56 @@ func TestEventBatch_Clear(t *testing.T) {
|
|||
eb.Clear()
|
||||
assert.Equal(t, 100, eb.Size())
|
||||
}
|
||||
|
||||
func TestHugeBatches(t *testing.T) {
|
||||
data := make([]byte, 500)
|
||||
events := make([]*eventhub.Event, 0)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
// 100 / 4 * 50000 = 1250000 bytes per partition
|
||||
partitionKey := strconv.Itoa(i % 4)
|
||||
evt := &eventhub.Event{
|
||||
Data: data,
|
||||
PartitionKey: &partitionKey,
|
||||
}
|
||||
|
||||
events = append(events, evt)
|
||||
}
|
||||
|
||||
opts := &eventhub.BatchOptions{
|
||||
MaxSize: 10000,
|
||||
}
|
||||
iter := eventhub.NewEventBatchIterator(events...)
|
||||
iterCount := 0
|
||||
|
||||
for !iter.Done() {
|
||||
_, err := iter.Next("batchId", opts)
|
||||
assert.NoError(t, err)
|
||||
|
||||
iterCount++
|
||||
|
||||
if iterCount > 101 {
|
||||
assert.Fail(t, "Too much iteration")
|
||||
}
|
||||
}
|
||||
|
||||
assert.Greater(t, iterCount, 5)
|
||||
}
|
||||
|
||||
func TestOneHugeEvent(t *testing.T) {
|
||||
data := make([]byte, 1100)
|
||||
events := []*eventhub.Event{
|
||||
{
|
||||
Data: data,
|
||||
},
|
||||
}
|
||||
opts := &eventhub.BatchOptions{
|
||||
MaxSize: 1000,
|
||||
}
|
||||
iter := eventhub.NewEventBatchIterator(events...)
|
||||
|
||||
for !iter.Done() {
|
||||
_, err := iter.Next("batchId", opts)
|
||||
assert.Equal(t, err, eventhub.ErrMessageIsTooBig)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
# Change Log
|
||||
|
||||
## `v3.3.3`
|
||||
- EventBatchIterator drops messages which bigger than 1MB with an error
|
||||
|
||||
## `v3.3.2`
|
||||
- passing a context to internal calls that use go-amqp that now expect a context
|
||||
- updating dependencies in go.mod
|
||||
|
@ -50,7 +53,7 @@
|
|||
- cleanup connection after making management request
|
||||
|
||||
## `v1.3.0`
|
||||
- add `SystemProperties` to `Event` which contains immutable broker provided metadata (squence number, offset,
|
||||
- add `SystemProperties` to `Event` which contains immutable broker provided metadata (squence number, offset,
|
||||
enqueued time)
|
||||
|
||||
## `v1.2.0`
|
||||
|
@ -63,11 +66,11 @@
|
|||
- update to amqp 0.11.0 and change sender to use unsettled rather than receiver second mode
|
||||
|
||||
## `v1.1.3`
|
||||
- fix leak in partition persistence
|
||||
- fix leak in partition persistence
|
||||
- fix discarding event properties on batch sending
|
||||
|
||||
## `v1.1.2`
|
||||
- take dep on updated amqp common which has more permissive RPC status description parsing
|
||||
- take dep on updated amqp common which has more permissive RPC status description parsing
|
||||
|
||||
## `v1.1.1`
|
||||
- close sender when hub is closed
|
||||
|
|
Загрузка…
Ссылка в новой задаче