[azservicebus, azeventhubs] Fixing a bug where our management link would hang in certain circumstances (#23400)

Fixing a bug where we could end up getting our management link stuck.

The problem was basically a combination of errors:
1. We were missing the sender settlement mode, which made it so we were only 1/2 in the "receive and delete" mode that we intended to be.
2. Ignoring that, our AcceptMessage() call was actually load bearing and critical and _wasn't_ getting called in some cases, like if you cancelled the call.
This commit is contained in:
Richard Park 2024-09-05 13:54:19 -07:00 коммит произвёл GitHub
Родитель cc560d64d4
Коммит f0caa1eee6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
7 изменённых файлов: 55 добавлений и 9 удалений

Просмотреть файл

@ -1,5 +1,11 @@
# Release History
## 1.2.3 (Unreleased)
### Bugs Fixed
- Fixed a bug where cancelling RenewMessageLock() calls could cause hangs in future RenewMessageLock calls. (PR#23400)
## 1.2.2 (2024-08-15)
### Bugs Fixed

Просмотреть файл

@ -4,4 +4,4 @@
package internal
// Version is the semantic version number
const Version = "v1.2.2"
const Version = "v1.2.3"

Просмотреть файл

@ -125,6 +125,10 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error)
receiverOpts := &amqp.ReceiverOptions{
TargetAddress: link.clientAddress,
Credit: defaultReceiverCredits,
// set our receiver link into the "receive and delete" mode - messages arrive pre-settled.
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(),
}
if link.sessionID != nil {
@ -306,10 +310,6 @@ func (l *rpcLink) internalRPC(ctx context.Context, msg *amqp.Message) (*amqpwrap
Message: res,
}
if err := l.receiver.AcceptMessage(ctx, res); err != nil {
return response, fmt.Errorf("failed accepting message on rpc link: %w", err)
}
var rpcErr RPCError
if asRPCError(response, &rpcErr) {

Просмотреть файл

@ -404,6 +404,23 @@ func TestRPCLinkCancelClientSideWait(t *testing.T) {
}
func TestRPCLinkUsesCorrectFlags(t *testing.T) {
tester := NewRPCTester(t)
link, err := NewRPCLink(context.Background(), RPCLinkArgs{
Client: &rpcTesterClient{
session: tester,
},
Address: "some-address",
LogEvent: "rpctesting",
})
require.NoError(t, err)
require.NoError(t, link.Close(context.Background()))
require.Equal(t, amqp.SenderSettleModeSettled, *tester.receiverOpts.RequestedSenderSettleMode)
require.Equal(t, amqp.ReceiverSettleModeFirst, *tester.receiverOpts.SettlementMode)
}
func NewRPCTester(t *testing.T) *rpcTester {
return &rpcTester{t: t,
ResponsesCh: make(chan *rpcTestResp, 1000),
@ -421,6 +438,7 @@ func NewRPCTester(t *testing.T) *rpcTester {
type rpcTester struct {
amqpwrap.AMQPSenderCloser
amqpwrap.AMQPReceiverCloser
receiverOpts *amqp.ReceiverOptions
// Accepted contains all the messages where we called AcceptMessage(msg)
// We only call this when we
@ -465,6 +483,7 @@ func (c *rpcTesterClient) NewSession(ctx context.Context, opts *amqp.SessionOpti
func (c *rpcTesterClient) Close() error { return nil }
func (tester *rpcTester) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error) {
tester.receiverOpts = opts
return tester, nil
}

Просмотреть файл

@ -8,6 +8,8 @@
### Bugs Fixed
- Fixed a bug where cancelling RenewMessageLock() calls could cause hangs in future RenewMessageLock calls. (PR#23400)
### Other Changes
## 1.7.1 (2024-05-20)

Просмотреть файл

@ -121,6 +121,10 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error)
receiverOpts := &amqp.ReceiverOptions{
TargetAddress: link.clientAddress,
Credit: defaultReceiverCredits,
// set our receiver link into the "receive and delete" mode - messages arrive pre-settled.
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(),
}
if link.sessionID != nil {
@ -286,10 +290,6 @@ func (l *rpcLink) RPC(ctx context.Context, msg *amqp.Message) (*amqpwrap.RPCResp
Message: res,
}
if err := l.receiver.AcceptMessage(ctx, res); err != nil {
return response, fmt.Errorf("failed accepting message on rpc link: %w", err)
}
var rpcErr RPCError
if asRPCError(response, &rpcErr) {

Просмотреть файл

@ -244,6 +244,23 @@ func TestRPCLinkClosingQuickly(t *testing.T) {
require.NoError(t, link.Close(context.Background()))
}
func TestRPCLinkUsesCorrectFlags(t *testing.T) {
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000), Accepted: make(chan *amqp.Message, 1)}
link, err := NewRPCLink(context.Background(), RPCLinkArgs{
Client: &rpcTesterClient{
session: tester,
},
Address: "some-address",
LogEvent: "rpctesting",
})
require.NoError(t, err)
require.NoError(t, link.Close(context.Background()))
require.Equal(t, amqp.SenderSettleModeSettled, *tester.receiverOpts.RequestedSenderSettleMode)
require.Equal(t, amqp.ReceiverSettleModeFirst, *tester.receiverOpts.SettlementMode)
}
// rpcTester has all the functions needed (for our RPC tests) to be:
// - an AMQPSession
// - an AMQPReceiverCloser
@ -253,6 +270,7 @@ func TestRPCLinkClosingQuickly(t *testing.T) {
type rpcTester struct {
amqpwrap.AMQPSenderCloser
amqpwrap.AMQPReceiverCloser
receiverOpts *amqp.ReceiverOptions
// Accepted contains all the messages where we called AcceptMessage(msg)
// We only call this when we
@ -281,6 +299,7 @@ func (c *rpcTesterClient) NewSession(ctx context.Context, opts *amqp.SessionOpti
func (c *rpcTesterClient) Close() error { return nil }
func (tester *rpcTester) NewReceiver(ctx context.Context, source string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error) {
tester.receiverOpts = opts
return tester, nil
}