do not try to recover sender errors that are client errors
This commit is contained in:
Родитель
286bf0c63f
Коммит
bd2438a5dd
|
@ -136,7 +136,7 @@ func (em *entityManager) Execute(ctx context.Context, method string, entityPath
|
|||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
}
|
||||
|
||||
|
||||
if res != nil {
|
||||
applyResponseInfo(span, res)
|
||||
}
|
||||
|
|
11
hub_test.go
11
hub_test.go
|
@ -258,6 +258,7 @@ func (suite *eventHubSuite) TestSasToken() {
|
|||
func (suite *eventHubSuite) TestPartitioned() {
|
||||
tests := map[string]func(context.Context, *testing.T, *Hub, string){
|
||||
"TestSend": testBasicSend,
|
||||
"TestSendTooBig": testSendTooBig,
|
||||
"TestSendAndReceive": testBasicSendAndReceive,
|
||||
"TestBatchSendAndReceive": testBatchSendAndReceive,
|
||||
}
|
||||
|
@ -281,7 +282,15 @@ func (suite *eventHubSuite) TestPartitioned() {
|
|||
|
||||
func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) {
|
||||
err := client.Send(ctx, NewEventFromString("Hello!"))
|
||||
assert.Nil(t, err)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func testSendTooBig(ctx context.Context, t *testing.T, client *Hub, _ string) {
|
||||
data := make([]byte, 256*1024)
|
||||
_, _ = rand.Read(data)
|
||||
event := NewEvent(data)
|
||||
err := client.Send(ctx, event)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionID string) {
|
||||
|
|
39
sender.go
39
sender.go
|
@ -25,6 +25,7 @@ package eventhub
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go"
|
||||
|
@ -138,6 +139,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
|
|||
return ctx.Err()
|
||||
default:
|
||||
// try to recover the connection
|
||||
count := 0
|
||||
_, retryErr := common.Retry(10, 5*time.Second, func() (interface{}, error) {
|
||||
sp, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.trySend.tryRecover")
|
||||
defer sp.Finish()
|
||||
|
@ -150,24 +152,31 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
|
|||
amqpErr.Condition == "com.microsoft:timeout" {
|
||||
|
||||
log.For(ctx).Debug(amqpErr.Error())
|
||||
time.Sleep(2 * time.Second) // delay send for a moment due to server busy
|
||||
return nil, common.Retryable(amqpErr.Error())
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Duration(2*rand.Intn(1000)/1000) * time.Second) // delay send for a moment due to server busy
|
||||
|
||||
err := s.Recover(ctx)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// context is done, so return
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return nil, common.Retryable(err.Error())
|
||||
if count >= 1 {
|
||||
// we've been here before, try to recover the connection
|
||||
err := s.Recover(ctx)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// context is done, so return
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return nil, common.Retryable(err.Error())
|
||||
}
|
||||
log.For(ctx).Debug("recovered the connection")
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
count++
|
||||
// connection should still be good
|
||||
return nil, nil
|
||||
}
|
||||
log.For(ctx).Debug("recovered the connection")
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
|
|
Загрузка…
Ссылка в новой задаче