[azeventhubs,azservicebus] Applying retry fix to both eventhubs and servicebus (#23562)

* Some lint errors.
* Fixing some broken tests. It looks like these weren't running in CI because what I've changed should not have affected them at all.

Fixes #23523
This commit is contained in:
Richard Park 2024-10-10 19:52:47 -07:00 коммит произвёл GitHub
Родитель 5df372ce81
Коммит 42922c1d30
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
17 изменённых файлов: 167 добавлений и 233 удалений

Просмотреть файл

@ -99,11 +99,11 @@
},
{
"Name": "messaging/azservicebus",
"CoverageGoal": 0.48
"CoverageGoal": 0.39
},
{
"Name": "messaging/azeventhubs",
"CoverageGoal": 0.60
"CoverageGoal": 0.45
},
{
"Name": "messaging/eventgrid/aznamespaces",
@ -134,4 +134,4 @@
"CoverageGoal": 0.75
}
]
}
}

Просмотреть файл

@ -1,10 +1,11 @@
# Release History
## 1.2.3 (Unreleased)
## 1.2.3 (2024-10-14)
### Bugs Fixed
- Fixed a bug where cancelling RenewMessageLock() calls could cause hangs in future RenewMessageLock calls. (PR#23400)
- Fixed bug where cancelling management link calls, such GetEventHubProperties() or GetPartitionProperties, could result in blocked calls. (PR#23400)
- Apply fix from @bcho for overflows with retries. (PR#23562)
## 1.2.2 (2024-08-15)
@ -23,9 +24,10 @@
### Bugs Fixed
Processor.Run had unclear behavior for some cases:
- Run() now returns an explicit error when called more than once on a single
- Run() now returns an explicit error when called more than once on a single
Processor instance or if multiple Run calls are made concurrently. (PR#22833)
- NextProcessorClient now properly terminates (and returns nil) if called on a
- NextProcessorClient now properly terminates (and returns nil) if called on a
stopped Processor. (PR#22833)
## 1.1.0 (2024-04-02)

Просмотреть файл

@ -942,7 +942,7 @@ func sendEventToPartition(t *testing.T, producer *azeventhubs.ProducerClient, pa
eventToSend.Properties = props
err = batch.AddEventData(event, nil)
err = batch.AddEventData(&eventToSend, nil)
require.NoError(t, err)
}

Просмотреть файл

@ -6,7 +6,6 @@ package internal
import (
"context"
"fmt"
"net"
"testing"
"time"
@ -113,6 +112,8 @@ func TestLinksRecoverLinkWithConnectionFailureAndExpiredContext(t *testing.T) {
defer test.RequireClose(t, links)
defer test.RequireNSClose(t, ns)
t.Logf("Getting links (original), manually")
oldLWID, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)
@ -123,32 +124,44 @@ func TestLinksRecoverLinkWithConnectionFailureAndExpiredContext(t *testing.T) {
err = origConn.Close()
require.NoError(t, err)
err = oldLWID.Link().Send(context.Background(), &amqp.Message{}, nil)
require.Error(t, err)
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))
// Try to recover, but using an expired context. We'll get a network error (not enough time to resolve or
// create a connection), which would normally be a connection level recovery event.
cancelledCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()
err = links.lr.RecoverIfNeeded(cancelledCtx, lwidToError(err, oldLWID))
var netErr net.Error
require.ErrorAs(t, err, &netErr)
t.Logf("Sending message, within retry loop, with an already expired context")
// now recover like normal
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(err, oldLWID))
require.NoError(t, err)
err = links.Retry(cancelledCtx, "(expired context) retry loop with precancelled context", "send", "0", exported.RetryOptions{}, func(ctx context.Context, lwid LinkWithID[amqpwrap.AMQPSenderCloser]) error {
// ignoring the cancelled context, let's see what happens.
t.Logf("(expired context) Sending message")
err = lwid.Link().Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("(expired context) hello world")},
}, nil)
newLWID, err := links.GetLink(context.Background(), "0")
t.Logf("(expired context) Message sent, error: %#v", err)
return err
})
require.ErrorIs(t, err, context.DeadlineExceeded)
t.Logf("Sending message, within retry loop, NO expired context")
var newLWID LinkWithID[amqpwrap.AMQPSenderCloser]
err = links.Retry(context.Background(), "(normal) retry loop without cancelled context", "send", "0", exported.RetryOptions{}, func(ctx context.Context, lwid LinkWithID[amqpwrap.AMQPSenderCloser]) error {
// ignoring the cancelled context, let's see what happens.
t.Logf("(normal) Sending message")
err = lwid.Link().Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("hello world")},
}, nil)
t.Logf("(normal) Message sent, error: %#v", err)
newLWID = lwid
return err
})
require.NoError(t, err)
requireNewLinkNewConn(t, oldLWID, newLWID)
err = newLWID.Link().Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("hello world")},
}, nil)
require.NoError(t, err)
require.Equal(t, newLWID.ConnID(), uint64(2), "we should have recovered the connection")
}
func TestLinkFailureWhenConnectionIsDead(t *testing.T) {

Просмотреть файл

@ -37,8 +37,6 @@ func SetupRPC(sender *MockAMQPSenderCloser, receiver *MockAMQPReceiverCloser, ex
CorrelationID: sentMessage.Properties.MessageID,
},
}
receiver.EXPECT().AcceptMessage(gomock.Any(), gomock.Any()).Return(nil)
// let the caller fill in the blanks of whatever needs to happen here.
handler(sentMessage, response)
return response, nil

Просмотреть файл

@ -181,9 +181,6 @@ func TestRPCLinkNonErrorRequiresNoRecovery(t *testing.T) {
require.Equal(t, 200, resp.Code)
require.Equal(t, "response from service", resp.Message.Value)
acceptedMessage := <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")
require.NoError(t, link.Close(context.Background()))
logMessages := getLogs()
@ -219,9 +216,6 @@ func TestRPCLinkNonErrorLockLostDoesNotBreakAnything(t *testing.T) {
require.ErrorAs(t, err, &rpcErr)
require.Equal(t, 400, rpcErr.RPCCode())
acceptedMessage := <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")
// validate that a normal error doesn't cause the response router to shut down
resp, err = link.RPC(context.Background(), &amqp.Message{
ApplicationProperties: map[string]any{
@ -232,8 +226,6 @@ func TestRPCLinkNonErrorLockLostDoesNotBreakAnything(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, "response from service", resp.Message.Value)
acceptedMessage = <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")
}
func TestRPCLinkClosingClean_SessionCreationFailed(t *testing.T) {
@ -424,7 +416,6 @@ func TestRPCLinkUsesCorrectFlags(t *testing.T) {
func NewRPCTester(t *testing.T) *rpcTester {
return &rpcTester{t: t,
ResponsesCh: make(chan *rpcTestResp, 1000),
Accepted: make(chan *amqp.Message, 1),
RPCLoopStarted: make(chan struct{}, 1),
}
}
@ -440,9 +431,6 @@ type rpcTester struct {
amqpwrap.AMQPReceiverCloser
receiverOpts *amqp.ReceiverOptions
// Accepted contains all the messages where we called AcceptMessage(msg)
// We only call this when we
Accepted chan *amqp.Message
ResponsesCh chan *rpcTestResp
t *testing.T
@ -501,12 +489,6 @@ func (tester *rpcTester) LinkName() string {
// receiver functions
func (tester *rpcTester) AcceptMessage(ctx context.Context, msg *amqp.Message) error {
require.NotNil(tester.t, tester.Accepted, "No messages should be AcceptMessage()'d since the tester.Accepted channel was nil")
tester.Accepted <- msg
return nil
}
func (tester *rpcTester) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) {
tester.closeRPCLoopStarted.Do(func() {
close(tester.RPCLoopStarted)

Просмотреть файл

@ -108,7 +108,7 @@ func parseSig(sigStr string) (*sig, error) {
case "skn":
parsed.skn = keyValue[1]
default:
return nil, fmt.Errorf(fmt.Sprintf("unknown key / value: %q", keyValue))
return nil, fmt.Errorf("unknown key / value: %q", keyValue)
}
}
return parsed, nil

Просмотреть файл

@ -114,25 +114,33 @@ func setDefaults(o *exported.RetryOptions) {
}
// (adapted from from azcore/policy_retry)
func calcDelay(o exported.RetryOptions, try int32) time.Duration {
if try == 0 {
return 0
func calcDelay(o exported.RetryOptions, try int32) time.Duration { // try is >=1; never 0
// avoid overflow when shifting left
factor := time.Duration(math.MaxInt64)
if try < 63 {
factor = time.Duration(int64(1<<try) - 1)
}
pow := func(number int64, exponent int32) int64 { // pow is nested helper function
var result int64 = 1
for n := int32(0); n < exponent; n++ {
result *= number
}
return result
delay := factor * o.RetryDelay
if delay < factor {
// overflow has happened so set to max value
delay = time.Duration(math.MaxInt64)
}
delay := time.Duration(pow(2, try)-1) * o.RetryDelay
// Introduce jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3)
jitterMultiplier := rand.Float64()/2 + 0.8 // NOTE: We want math/rand; not crypto/rand
// Introduce some jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3)
delay = time.Duration(delay.Seconds() * (rand.Float64()/2 + 0.8) * float64(time.Second)) // NOTE: We want math/rand; not crypto/rand
if delay > o.MaxRetryDelay {
delayFloat := float64(delay) * jitterMultiplier
if delayFloat > float64(math.MaxInt64) {
// the jitter pushed us over MaxInt64, so just use MaxInt64
delay = time.Duration(math.MaxInt64)
} else {
delay = time.Duration(delayFloat)
}
if delay > o.MaxRetryDelay { // MaxRetryDelay is backfilled with non-negative value
delay = o.MaxRetryDelay
}
return delay
}

Просмотреть файл

@ -91,9 +91,6 @@ func TestRetrier(t *testing.T) {
return errors.Is(err, context.Canceled)
}
customRetryOptions := fastRetryOptions
customRetryOptions.MaxRetries = 1
var actualAttempts []int32
maxRetries := int32(2)
@ -276,7 +273,7 @@ func TestRetryDefaults(t *testing.T) {
require.EqualValues(t, time.Duration(0), ro.RetryDelay)
}
func TestCalcDelay(t *testing.T) {
func TestCalcDelay2(t *testing.T) {
// calcDelay introduces some jitter, automatically.
ro := exported.RetryOptions{}
setDefaults(&ro)
@ -409,6 +406,74 @@ func TestRetryLogging(t *testing.T) {
})
}
func BenchmarkCalcDelay_defaultSettings(b *testing.B) {
retryOptions := exported.RetryOptions{}
setDefaults(&retryOptions)
for i := 0; i < b.N; i++ {
calcDelay(retryOptions, 32)
}
}
func BenchmarkCalcDelay_overflow(b *testing.B) {
retryOptions := exported.RetryOptions{
RetryDelay: 1,
MaxRetryDelay: math.MaxInt64,
}
setDefaults(&retryOptions)
for i := 0; i < b.N; i++ {
calcDelay(retryOptions, 100)
}
}
func TestCalcDelay(t *testing.T) {
requireWithinJitter := func(t testing.TB, expected, actual time.Duration) {
lower, upper := float64(expected)*0.8, float64(expected)*1.3
require.Truef(
t, float64(actual) >= lower && float64(actual) <= upper,
"%.2f not within jitter of %.2f", actual.Seconds(), expected.Seconds(),
)
}
t.Run("basic cases", func(t *testing.T) {
retryOptions := exported.RetryOptions{
RetryDelay: 1 * time.Second,
MaxRetryDelay: 30 * time.Second,
}
setDefaults(&retryOptions)
for i := int32(1); i <= 5; i++ {
delay := float64(calcDelay(retryOptions, i))
expected := float64((1<<i - 1) * int64(retryOptions.RetryDelay))
requireWithinJitter(
t, time.Duration(expected), time.Duration(delay),
)
}
for i := int32(6); i < 100; i++ {
require.Equal(
t,
calcDelay(retryOptions, i),
retryOptions.MaxRetryDelay,
)
}
})
t.Run("overflow", func(t *testing.T) {
retryOptions := exported.RetryOptions{
RetryDelay: 1,
MaxRetryDelay: math.MaxInt64,
}
setDefaults(&retryOptions)
for i := int32(63); i < 100000; i++ {
requireWithinJitter(
t, math.MaxInt64, calcDelay(retryOptions, i),
)
}
})
}
// retryRE is used to replace the 'retry time' with a consistent string to make
// unit tests against logging simpler
// A typical string: "[azsb.Retry] (retry) Attempt 1 sleeping for 1.10233ms"

Просмотреть файл

@ -1,5 +1,11 @@
# Release History
## 1.7.3 (2024-10-14)
### Bugs Fixed
- Apply fix from @bcho for overflows with retries. (PR#23562)
## 1.7.2 (2024-09-11)
### Bugs Fixed
@ -33,7 +39,7 @@
### Bugs Fixed
- Settling a message (using CompleteMessage, AbandonMessage, etc..) on a different Receiver instance than you received on no
- Settling a message (using CompleteMessage, AbandonMessage, etc..) on a different Receiver instance than you received on no
longer leaks memory. (PR#22253)
## 1.5.0 (2023-10-10)
@ -57,7 +63,7 @@
### Features Added
- `admin.SubscriptionProperties` now allow for a `DefaultRule` to be set. This allows Subscriptions to be created with an immediate filter/action.
- `admin.SubscriptionProperties` now allow for a `DefaultRule` to be set. This allows Subscriptions to be created with an immediate filter/action.
Contributed by @StrawbrryFlurry. (PR#20888)
## 1.3.0 (2023-05-09)

Просмотреть файл

@ -4,4 +4,4 @@
package internal
// Version is the semantic version number
const Version = "v1.7.2"
const Version = "v1.7.3"

Просмотреть файл

@ -56,7 +56,7 @@ func TestRPCLinkNonErrorRequiresRecovery(t *testing.T) {
}
func TestRPCLinkNonErrorRequiresNoRecovery(t *testing.T) {
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000), Accepted: make(chan *amqp.Message, 1)}
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000)}
link, err := NewRPCLink(context.Background(), RPCLinkArgs{
Client: &rpcTesterClient{
@ -97,16 +97,13 @@ func TestRPCLinkNonErrorRequiresNoRecovery(t *testing.T) {
require.Equal(t, 200, resp.Code)
require.Equal(t, "response from service", resp.Message.Value)
acceptedMessage := <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")
logMessages := cleanupLogs()
require.Contains(t, logMessages, "[rpctesting] RPCLink had no response channel for correlation ID you've-never-seen-this", "exampleUncorrelatedMessage causes warning for uncorrelated message")
require.Contains(t, logMessages, "[rpctesting] Non-fatal error in RPCLink, starting to receive again: *Error{Condition: com.microsoft:server-busy, Description: , Info: map[]}")
}
func TestRPCLinkNonErrorLockLostDoesNotBreakAnything(t *testing.T) {
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000), Accepted: make(chan *amqp.Message, 1)}
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000)}
link, err := NewRPCLink(context.Background(), RPCLinkArgs{
Client: &rpcTesterClient{
@ -132,9 +129,6 @@ func TestRPCLinkNonErrorLockLostDoesNotBreakAnything(t *testing.T) {
require.ErrorAs(t, err, &rpcErr)
require.Equal(t, 400, rpcErr.RPCCode())
acceptedMessage := <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")
// validate that a normal error doesn't cause the response router to shut down
resp, err = link.RPC(context.Background(), &amqp.Message{
ApplicationProperties: map[string]any{
@ -145,8 +139,6 @@ func TestRPCLinkNonErrorLockLostDoesNotBreakAnything(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, "response from service", resp.Message.Value)
acceptedMessage = <-tester.Accepted
require.Equal(t, "response from service", acceptedMessage.Value, "successfully received message is accepted")
}
func TestRPCLinkClosingClean_SessionCreationFailed(t *testing.T) {
@ -230,7 +222,7 @@ func TestRPCLinkClosingClean_CreationFailsButSessionCloseFailsToo(t *testing.T)
}
func TestRPCLinkClosingQuickly(t *testing.T) {
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000), Accepted: make(chan *amqp.Message, 1)}
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000)}
link, err := NewRPCLink(context.Background(), RPCLinkArgs{
Client: &rpcTesterClient{
@ -245,7 +237,7 @@ func TestRPCLinkClosingQuickly(t *testing.T) {
}
func TestRPCLinkUsesCorrectFlags(t *testing.T) {
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000), Accepted: make(chan *amqp.Message, 1)}
tester := &rpcTester{t: t, ResponsesCh: make(chan *rpcTestResp, 1000)}
link, err := NewRPCLink(context.Background(), RPCLinkArgs{
Client: &rpcTesterClient{
@ -272,9 +264,6 @@ type rpcTester struct {
amqpwrap.AMQPReceiverCloser
receiverOpts *amqp.ReceiverOptions
// Accepted contains all the messages where we called AcceptMessage(msg)
// We only call this when we
Accepted chan *amqp.Message
ResponsesCh chan *rpcTestResp
t *testing.T
}
@ -317,12 +306,6 @@ func (tester *rpcTester) LinkName() string {
// receiver functions
func (tester *rpcTester) AcceptMessage(ctx context.Context, msg *amqp.Message) error {
require.NotNil(tester.t, tester.Accepted, "No messages should be AcceptMessage()'d since the tester.Accepted channel was nil")
tester.Accepted <- msg
return nil
}
func (tester *rpcTester) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) {
select {
case resp := <-tester.ResponsesCh:

Просмотреть файл

@ -107,7 +107,7 @@ func parseSig(sigStr string) (*sig, error) {
case "skn":
parsed.skn = keyValue[1]
default:
return nil, fmt.Errorf(fmt.Sprintf("unknown key / value: %q", keyValue))
return nil, fmt.Errorf("unknown key / value: %q", keyValue)
}
}
return parsed, nil

Просмотреть файл

@ -117,25 +117,33 @@ func setDefaults(o *exported.RetryOptions) {
}
// (adapted from from azcore/policy_retry)
func calcDelay(o exported.RetryOptions, try int32) time.Duration {
if try == 0 {
return 0
func calcDelay(o exported.RetryOptions, try int32) time.Duration { // try is >=1; never 0
// avoid overflow when shifting left
factor := time.Duration(math.MaxInt64)
if try < 63 {
factor = time.Duration(int64(1<<try) - 1)
}
pow := func(number int64, exponent int32) int64 { // pow is nested helper function
var result int64 = 1
for n := int32(0); n < exponent; n++ {
result *= number
}
return result
delay := factor * o.RetryDelay
if delay < factor {
// overflow has happened so set to max value
delay = time.Duration(math.MaxInt64)
}
delay := time.Duration(pow(2, try)-1) * o.RetryDelay
// Introduce jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3)
jitterMultiplier := rand.Float64()/2 + 0.8 // NOTE: We want math/rand; not crypto/rand
// Introduce some jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3)
delay = time.Duration(delay.Seconds() * (rand.Float64()/2 + 0.8) * float64(time.Second)) // NOTE: We want math/rand; not crypto/rand
if delay > o.MaxRetryDelay {
delayFloat := float64(delay) * jitterMultiplier
if delayFloat > float64(math.MaxInt64) {
// the jitter pushed us over MaxInt64, so just use MaxInt64
delay = time.Duration(math.MaxInt64)
} else {
delay = time.Duration(delayFloat)
}
if delay > o.MaxRetryDelay { // MaxRetryDelay is backfilled with non-negative value
delay = o.MaxRetryDelay
}
return delay
}

Просмотреть файл

@ -92,9 +92,6 @@ func TestRetrier(t *testing.T) {
return errors.Is(err, context.Canceled)
}
customRetryOptions := fastRetryOptions
customRetryOptions.MaxRetries = 1
var actualAttempts []int32
maxRetries := int32(2)

Просмотреть файл

@ -423,7 +423,7 @@ func newReceivedMessage(amqpMsg *amqp.Message, receiver amqpwrap.AMQPReceiver) *
// }
}
if amqpMsg.DeliveryTag != nil && len(amqpMsg.DeliveryTag) > 0 {
if len(amqpMsg.DeliveryTag) > 0 {
lockToken, err := lockTokenFromMessageTag(amqpMsg)
if err == nil {

Просмотреть файл

@ -7,25 +7,19 @@ import (
"context"
"errors"
"fmt"
"net"
"regexp"
"sort"
"strings"
"sync/atomic"
"testing"
"time"
"log"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/sas"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
"nhooyr.io/websocket"
)
func TestReceiverBackupSettlement(t *testing.T) {
@ -1049,91 +1043,6 @@ func TestReceiveWithDifferentWaitTime(t *testing.T) {
require.Greater(t, bigger, base2)
}
func TestReceiverConnectionTimeout(t *testing.T) {
getLogs := test.CaptureLogsForTest(false)
var conn *slowConn
newWebSocketConnFn := func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error) {
t.Logf("Using websocket function")
opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}}
wssConn, _, err := websocket.Dial(ctx, args.Host, opts)
if err != nil {
return nil, err
}
netConn := websocket.NetConn(ctx, wssConn, websocket.MessageBinary)
conn = &slowConn{t: t, Conn: netConn}
t.Logf("Returning slow connection")
return conn, nil
}
serviceBusClient := newServiceBusClientForTest(t, &test.NewClientOptions[ClientOptions]{
ClientOptions: &ClientOptions{
NewWebSocketConn: newWebSocketConnFn,
RetryOptions: exported.RetryOptions{
MaxRetryDelay: time.Nanosecond,
},
},
})
queueName, cleanup := createQueue(t, nil, nil)
t.Cleanup(cleanup)
sender, err := serviceBusClient.NewSender(queueName, nil)
require.NoError(t, err)
err = sender.SendMessage(context.Background(), &Message{
Body: []byte("hello world"),
}, nil)
require.NoError(t, err)
receiver, err := serviceBusClient.NewReceiverForQueue(queueName, nil)
require.NoError(t, err)
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
messages, err := receiver.ReceiveMessages(ctxWithTimeout, 1, nil)
require.NoError(t, err)
require.NotEmpty(t, messages)
log.Printf("\n\n\nReceived messages, lock expiration in %s\n\n\n", messages[0].LockedUntil)
// cause the connection to have arbitrarily low read/write timeouts - it'll force errors with
// this connection, at the lowest level, and we should see it bubble up as a connection recovery.
atomic.StoreInt64(&conn.slow, 1)
log.Printf("\n\n\nAbout to renew message lock\n\n\n")
err = receiver.RenewMessageLock(context.Background(), messages[0], nil)
require.NoError(t, err)
// check that the log messages made it in.
recovered := false
logs := getLogs()
for _, log := range logs {
if strings.Contains(log, "Recovered connection and links") {
recovered = true
}
}
if !recovered {
// dump out the logs so we can see what happened instead...
for _, log := range logs {
t.Logf("LOG: %s", log)
}
}
require.True(t, recovered)
log.Printf("\n\n\nDone with renew message lock, lock expiry time: %s\n\n\n", messages[0].LockedUntil)
}
type receivedMessageSlice []*ReceivedMessage
func (messages receivedMessageSlice) Len() int {
@ -1147,40 +1056,3 @@ func (messages receivedMessageSlice) Less(i, j int) bool {
func (messages receivedMessageSlice) Swap(i, j int) {
messages[i], messages[j] = messages[j], messages[i]
}
type slowConn struct {
slow int64
t *testing.T
net.Conn
}
func (sc *slowConn) Read(b []byte) (n int, err error) {
if atomic.LoadInt64(&sc.slow) == 1 {
sc.t.Logf("Simulating broken reads")
err := sc.Conn.SetReadDeadline(time.Now().Add(time.Nanosecond))
if err != nil {
return 0, err
}
}
return sc.Conn.Read(b)
}
func (sc *slowConn) Write(b []byte) (n int, err error) {
if atomic.LoadInt64(&sc.slow) == 1 {
sc.t.Logf("Simulating broken writes")
err := sc.Conn.SetWriteDeadline(time.Now().Add(time.Nanosecond))
if err != nil {
return 0, err
}
}
return sc.Conn.Write(b)
}
func (sc *slowConn) Close() error {
sc.t.Logf("Closing connection")
return sc.Conn.Close()
}