*amqp.Sender renamed ID() to LinkName() (#238)
This commit is contained in:
Родитель
96e6508b93
Коммит
c168808e24
12
sender.go
12
sender.go
|
@ -69,7 +69,7 @@ type (
|
|||
// (used for testing)
|
||||
// Implemented by *amqp.Sender
|
||||
amqpSender interface {
|
||||
ID() string
|
||||
LinkName() string
|
||||
Send(ctx context.Context, msg *amqp.Message) error
|
||||
Close(ctx context.Context) error
|
||||
}
|
||||
|
@ -140,7 +140,7 @@ func (s *sender) recoverWithExpectedLinkID(ctx context.Context, expectedLinkID s
|
|||
|
||||
// if the link they started with has already been closed and removed we don't
|
||||
// need to trigger an additional recovery.
|
||||
if expectedLinkID != "" && s.amqpSender().ID() != expectedLinkID {
|
||||
if expectedLinkID != "" && s.amqpSender().LinkName() != expectedLinkID {
|
||||
tab.For(ctx).Debug("original linkID does not match, no recovery necessary")
|
||||
} else if !s.recovering {
|
||||
// another goroutine isn't recovering, so this one will
|
||||
|
@ -307,18 +307,18 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in
|
|||
switch e := err.(type) {
|
||||
case *amqp.Error:
|
||||
if e.Condition == errorServerBusy || e.Condition == errorTimeout {
|
||||
recoverLink(sender.ID(), err, false)
|
||||
recoverLink(sender.LinkName(), err, false)
|
||||
break
|
||||
}
|
||||
recoverLink(sender.ID(), err, true)
|
||||
recoverLink(sender.LinkName(), err, true)
|
||||
case *amqp.DetachError, net.Error:
|
||||
recoverLink(sender.ID(), err, true)
|
||||
recoverLink(sender.LinkName(), err, true)
|
||||
default:
|
||||
if !isRecoverableCloseError(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
recoverLink(sender.ID(), err, true)
|
||||
recoverLink(sender.LinkName(), err, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ type recoveryCall struct {
|
|||
recover bool
|
||||
}
|
||||
|
||||
func (s *testAmqpSender) ID() string {
|
||||
func (s *testAmqpSender) LinkName() string {
|
||||
return "sender-id"
|
||||
}
|
||||
|
||||
|
@ -347,6 +347,11 @@ func TestRecoveryBlock1(t *testing.T) {
|
|||
// })
|
||||
}
|
||||
|
||||
func TestAMQPSenderIsCompatibleWithInterface(t *testing.T) {
|
||||
var validateCompile amqpSender = &amqp.Sender{}
|
||||
require.NotNil(t, validateCompile)
|
||||
}
|
||||
|
||||
type fakeSender struct {
|
||||
id string
|
||||
closed bool
|
||||
|
|
Загрузка…
Ссылка в новой задаче