rework sender recovery
This commit is contained in:
Родитель
800295c8d7
Коммит
9a085870f4
|
@ -120,7 +120,7 @@ func (s *testSuite) TestMultiple() {
|
|||
processors := make(map[string]*EventProcessorHost, numPartitions)
|
||||
processorNames := make([]string, numPartitions)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout * 2)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*2)
|
||||
defer cancel()
|
||||
for i := 0; i < numPartitions; i++ {
|
||||
processor, err := s.newInMemoryEPHWithOptions(*hub.Name, sharedStore)
|
||||
|
|
82
sender.go
82
sender.go
|
@ -30,7 +30,6 @@ import (
|
|||
"github.com/Azure/azure-amqp-common-go"
|
||||
"github.com/Azure/azure-amqp-common-go/log"
|
||||
"github.com/Azure/azure-amqp-common-go/uuid"
|
||||
"github.com/Azure/azure-event-hubs-go/internal"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"pack.ag/amqp"
|
||||
)
|
||||
|
@ -115,50 +114,67 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
|
|||
sp, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.trySend")
|
||||
defer sp.Finish()
|
||||
|
||||
times := 3
|
||||
delay := 10 * time.Second
|
||||
durationOfSend := 3 * time.Second
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
times = int(time.Until(deadline) / (delay + durationOfSend))
|
||||
times = ehmath.Max(times, 1) // give at least one chance at sending
|
||||
err := opentracing.GlobalTracer().Inject(sp.Context(), opentracing.TextMap, evt)
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
_, err := common.Retry(times, delay, func() (interface{}, error) {
|
||||
sp, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.trySend.transmit")
|
||||
defer sp.Finish()
|
||||
|
||||
msg := evt.toMsg()
|
||||
sp.SetTag("eh.message-id", msg.Properties.MessageID)
|
||||
|
||||
// try as long as the context is not dead
|
||||
for {
|
||||
err = s.sender.Send(ctx, msg)
|
||||
if err == nil {
|
||||
// successful send
|
||||
return err
|
||||
}
|
||||
|
||||
// error recovery
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
// context is done, so return
|
||||
return ctx.Err()
|
||||
default:
|
||||
innerCtx, cancel := context.WithTimeout(ctx, durationOfSend)
|
||||
defer cancel()
|
||||
// try to recover the connection
|
||||
_, retryErr := common.Retry(10, 5*time.Second, func() (interface{}, error) {
|
||||
sp, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.trySend.tryRecover")
|
||||
defer sp.Finish()
|
||||
|
||||
err := opentracing.GlobalTracer().Inject(sp.Context(), opentracing.TextMap, evt)
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
return nil, err
|
||||
}
|
||||
if amqpErr, ok := err.(*amqp.Error); ok {
|
||||
// retry on known errors
|
||||
if amqpErr.Condition == "com.microsoft:server-busy" ||
|
||||
amqpErr.Condition == "com.microsoft:operation-cancelled" ||
|
||||
amqpErr.Condition == "com.microsoft:entity-moved" ||
|
||||
amqpErr.Condition == "com.microsoft:timeout" {
|
||||
|
||||
msg := evt.toMsg()
|
||||
sp.SetTag("eh.message-id", msg.Properties.MessageID)
|
||||
err = s.sender.Send(innerCtx, msg)
|
||||
if err != nil {
|
||||
recoverErr := s.Recover(ctx)
|
||||
if recoverErr != nil {
|
||||
log.For(ctx).Error(recoverErr)
|
||||
log.For(ctx).Debug(amqpErr.Error())
|
||||
return nil, common.Retryable(amqpErr.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if amqpErr, ok := err.(*amqp.Error); ok {
|
||||
if amqpErr.Condition == "com.microsoft:server-busy" {
|
||||
return nil, common.Retryable(amqpErr.Condition)
|
||||
err := s.Recover(ctx)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// context is done, so return
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
log.For(ctx).Error(err)
|
||||
if err != nil {
|
||||
return nil, common.Retryable(err.Error())
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return nil, err
|
||||
if retryErr != nil {
|
||||
log.For(ctx).Debug("retried, but error was unrecoverable")
|
||||
log.For(ctx).Error(retryErr)
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
})
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sender) String() string {
|
||||
|
|
|
@ -86,7 +86,7 @@ func (ts *testSuite) TestSingle() {
|
|||
}
|
||||
|
||||
func (ts *testSuite) TestMultiple() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout * 2)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*2)
|
||||
defer cancel()
|
||||
|
||||
hub, delHub, err := ts.RandomHub()
|
||||
|
|
Загрузка…
Ссылка в новой задаче