Only retry with retryable amqp errors for sender
This commit is contained in:
Родитель
705d23958e
Коммит
9d82129c2e
17
changelog.md
17
changelog.md
|
@ -1,5 +1,22 @@
|
|||
# Change Log
|
||||
|
||||
## `v0.10.8`
|
||||
- only retry with retryable amqp errors for sender [#201](https://github.com/Azure/azure-service-bus-go/issues/201)
|
||||
|
||||
## `v0.10.7`
|
||||
- add AzureEnvironment namespace option and use its definition [#192](https://github.com/Azure/azure-service-bus-go/issues/192)
|
||||
- fix for Websocket behind Proxy Issue [#196](https://github.com/Azure/azure-service-bus-go/issues/196)
|
||||
- fix nil error dereference [#199](https://github.com/Azure/azure-service-bus-go/issues/199)
|
||||
|
||||
## `v0.10.6`
|
||||
- fix a hang when closing a receiver
|
||||
|
||||
## `v0.10.5`
|
||||
- recover must rebuild the link atomically [#187](https://github.com/Azure/azure-service-bus-go/issues/187)
|
||||
|
||||
## `v0.10.4`
|
||||
- updates dependencies to their latest versions
|
||||
|
||||
## `v0.10.3`
|
||||
- Implements DefaultRuleDescription to allow setting a default rule for a subscription.
|
||||
|
||||
|
|
17
errors.go
17
errors.go
|
@ -3,8 +3,25 @@ package servicebus
|
|||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/v3/rpc"
|
||||
"github.com/Azure/go-amqp"
|
||||
)
|
||||
|
||||
// Error Conditions
|
||||
const (
|
||||
// Service Bus Errors
|
||||
errorServerBusy amqp.ErrorCondition = "com.microsoft:server-busy"
|
||||
errorTimeout amqp.ErrorCondition = "com.microsoft:timeout"
|
||||
errorOperationCancelled amqp.ErrorCondition = "com.microsoft:operation-cancelled"
|
||||
errorContainerClose amqp.ErrorCondition = "com.microsoft:container-close"
|
||||
)
|
||||
|
||||
const (
|
||||
amqpRetryDefaultTimes int = 3
|
||||
amqpRetryDefaultDelay time.Duration = time.Second
|
||||
amqpRetryBusyServerDelay time.Duration = 10 * time.Second
|
||||
)
|
||||
|
||||
type (
|
||||
|
|
77
sender.go
77
sender.go
|
@ -24,6 +24,7 @@ package servicebus
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -234,31 +235,10 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
|
|||
|
||||
switch err.(type) {
|
||||
case *amqp.Error, *amqp.DetachError:
|
||||
tab.For(ctx).Debug("recovering connection")
|
||||
_, retryErr := common.Retry(10, 10*time.Second, func() (interface{}, error) {
|
||||
ctx, sp := s.startProducerSpanFromContext(ctx, "sb.Sender.trySend.tryRecover")
|
||||
defer sp.End()
|
||||
|
||||
err := s.Recover(ctx)
|
||||
if err == nil {
|
||||
tab.For(ctx).Debug("recovered connection")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
return nil, common.Retryable(err.Error())
|
||||
}
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
tab.For(ctx).Debug("sender recovering retried, but error was unrecoverable")
|
||||
if err := s.Close(ctx); err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
}
|
||||
return retryErr
|
||||
err = s.handleAMQPError(ctx, err)
|
||||
if err != nil {
|
||||
tab.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
default:
|
||||
tab.For(ctx).Error(err)
|
||||
|
@ -268,6 +248,53 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
|
|||
}
|
||||
}
|
||||
|
||||
// handleAMQPError is called internally when an event has failed to send so we
|
||||
// can parse the error to determine whether we should attempt to retry sending the event again.
|
||||
func (s *Sender) handleAMQPError(ctx context.Context, err error) error {
|
||||
var amqpError *amqp.Error
|
||||
if errors.As(err, &amqpError) {
|
||||
switch amqpError.Condition {
|
||||
case errorServerBusy:
|
||||
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryBusyServerDelay)
|
||||
case errorTimeout:
|
||||
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
|
||||
case errorOperationCancelled:
|
||||
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
|
||||
case errorContainerClose:
|
||||
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
|
||||
}
|
||||
|
||||
func (s *Sender) retryRetryableAmqpError(ctx context.Context, times int, delay time.Duration) error {
|
||||
tab.For(ctx).Debug("recovering sender connection")
|
||||
_, retryErr := common.Retry(times, delay, func() (interface{}, error) {
|
||||
ctx, sp := s.startProducerSpanFromContext(ctx, "sb.Sender.trySend.tryRecover")
|
||||
defer sp.End()
|
||||
|
||||
err := s.Recover(ctx)
|
||||
if err == nil {
|
||||
tab.For(ctx).Debug("recovered connection")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
return nil, common.Retryable(err.Error())
|
||||
}
|
||||
})
|
||||
if retryErr != nil {
|
||||
tab.For(ctx).Debug("sender recovering retried, but error was unrecoverable")
|
||||
return retryErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Sender) connClosedError(ctx context.Context) error {
|
||||
name := "Sender"
|
||||
if s.Name != "" {
|
||||
|
|
Загрузка…
Ссылка в новой задаче