406 строки
10 KiB
Go
406 строки
10 KiB
Go
package eventhub
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/Azure/go-amqp"
|
|
"github.com/Azure/go-autorest/autorest/to"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// conforms to amqpSender
|
|
type testAmqpSender struct {
|
|
sendErrors []error
|
|
sendCount int
|
|
}
|
|
|
|
type recoveryCall struct {
|
|
linkID string
|
|
err error
|
|
recover bool
|
|
}
|
|
|
|
func (s *testAmqpSender) LinkName() string {
|
|
return "sender-id"
|
|
}
|
|
|
|
func (s *testAmqpSender) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
|
|
var err error
|
|
|
|
if len(s.sendErrors) > s.sendCount {
|
|
err = s.sendErrors[s.sendCount]
|
|
}
|
|
|
|
s.sendCount++
|
|
return err
|
|
}
|
|
|
|
func (s *testAmqpSender) Close(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func TestSenderRetries(t *testing.T) {
|
|
var recoverCalls []recoveryCall
|
|
|
|
var sender *testAmqpSender
|
|
|
|
getAmqpSender := func() amqpSender {
|
|
return sender
|
|
}
|
|
|
|
recover := func(linkID string, err error, recover bool) {
|
|
recoverCalls = append(recoverCalls, recoveryCall{linkID, err, recover})
|
|
}
|
|
|
|
t.Run("SendSucceedsOnFirstTry", func(t *testing.T) {
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{}
|
|
|
|
err := sendMessage(context.TODO(), getAmqpSender, 3, nil, recover)
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, 1, sender.sendCount)
|
|
assert.Empty(t, recoverCalls)
|
|
})
|
|
|
|
t.Run("SendExceedingRetries", func(*testing.T) {
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{
|
|
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
|
|
&amqp.SessionError{},
|
|
errors.New("We'll never attempt to use this one since we ran out of retries")},
|
|
}
|
|
|
|
actualErr := sendMessage(context.TODO(), getAmqpSender,
|
|
1, // note we're only allowing 1 retry attempt - so we get the first send() and then 1 additional.
|
|
nil, recover)
|
|
|
|
assert.EqualValues(t, &amqp.SessionError{}, actualErr)
|
|
assert.EqualValues(t, 2, sender.sendCount)
|
|
assert.EqualValues(t, []recoveryCall{
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
|
|
recover: true,
|
|
},
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.SessionError{},
|
|
recover: true,
|
|
},
|
|
}, recoverCalls)
|
|
|
|
})
|
|
|
|
t.Run("SendWithUnrecoverableAndNonRetryableError", func(*testing.T) {
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{
|
|
errors.New("Anything not explicitly retryable kills all retries"),
|
|
&amqp.ConnError{}, // we'll never get here.
|
|
},
|
|
}
|
|
|
|
actualErr := sendMessage(context.TODO(), getAmqpSender, 5, nil, recover)
|
|
|
|
assert.EqualValues(t, errors.New("Anything not explicitly retryable kills all retries"), actualErr)
|
|
assert.EqualValues(t, 1, sender.sendCount)
|
|
assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors")
|
|
})
|
|
|
|
t.Run("SendIsNotRecoverableIfLinkIsClosed", func(*testing.T) {
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{
|
|
&amqp.LinkError{}, // this is no longer considered a retryable error (ErrLinkDetached is, however)
|
|
},
|
|
}
|
|
|
|
actualErr := sendMessage(context.TODO(), getAmqpSender, 5, nil, recover)
|
|
|
|
var linkErr *amqp.LinkError
|
|
assert.ErrorAs(t, actualErr, &linkErr)
|
|
assert.EqualValues(t, 1, sender.sendCount)
|
|
assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors")
|
|
})
|
|
|
|
t.Run("SendWithAmqpErrors", func(*testing.T) {
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{&amqp.Error{
|
|
// retry but doesn't attempt to recover the connection
|
|
Condition: errorServerBusy,
|
|
}, &amqp.Error{
|
|
// retry but doesn't attempt to recover the connection
|
|
Condition: errorTimeout,
|
|
},
|
|
&amqp.Error{
|
|
// retry and will attempt to recover the connection
|
|
Condition: amqp.ErrCondNotImplemented,
|
|
}},
|
|
}
|
|
|
|
err := sendMessage(context.TODO(), getAmqpSender, 6, nil, recover)
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, 4, sender.sendCount)
|
|
assert.EqualValues(t, []recoveryCall{
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.Error{
|
|
Condition: errorServerBusy,
|
|
},
|
|
recover: false,
|
|
},
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.Error{
|
|
Condition: errorTimeout,
|
|
},
|
|
recover: false,
|
|
},
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.Error{
|
|
Condition: amqp.ErrCondNotImplemented,
|
|
},
|
|
recover: true,
|
|
},
|
|
}, recoverCalls)
|
|
})
|
|
|
|
t.Run("SendWithDetachOrNetErrors", func(*testing.T) {
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{
|
|
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
|
|
&net.DNSError{},
|
|
},
|
|
}
|
|
|
|
err := sendMessage(context.TODO(), getAmqpSender, 6, nil, recover)
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, 3, sender.sendCount)
|
|
assert.EqualValues(t, []recoveryCall{
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
|
|
recover: true,
|
|
},
|
|
{
|
|
linkID: "sender-id",
|
|
err: &net.DNSError{},
|
|
recover: true,
|
|
},
|
|
}, recoverCalls)
|
|
})
|
|
|
|
t.Run("SendWithRecoverableCloseError", func(*testing.T) {
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{
|
|
&amqp.ConnError{},
|
|
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
|
|
&amqp.SessionError{},
|
|
},
|
|
}
|
|
|
|
err := sendMessage(context.TODO(), getAmqpSender, 6, nil, recover)
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, 4, sender.sendCount)
|
|
assert.EqualValues(t, []recoveryCall{
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.ConnError{},
|
|
recover: true,
|
|
},
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
|
|
recover: true,
|
|
},
|
|
{
|
|
linkID: "sender-id",
|
|
err: &amqp.SessionError{},
|
|
recover: true,
|
|
},
|
|
}, recoverCalls)
|
|
})
|
|
|
|
t.Run("SendWithInfiniteRetries", func(*testing.T) {
|
|
maxRetries := -1
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{
|
|
// kind of silly but let's just make sure we would continue to retry.
|
|
&amqp.ConnError{},
|
|
&amqp.ConnError{},
|
|
&amqp.ConnError{},
|
|
},
|
|
}
|
|
|
|
err := sendMessage(context.TODO(), getAmqpSender, maxRetries, nil, recover)
|
|
assert.NoError(t, err, "Last call succeeds")
|
|
assert.EqualValues(t, 3+1, sender.sendCount)
|
|
assert.EqualValues(t, recoverCalls, []recoveryCall{
|
|
{linkID: "sender-id", err: &amqp.ConnError{}, recover: true},
|
|
{linkID: "sender-id", err: &amqp.ConnError{}, recover: true},
|
|
{linkID: "sender-id", err: &amqp.ConnError{}, recover: true},
|
|
})
|
|
})
|
|
|
|
t.Run("SendWithNoRetries", func(*testing.T) {
|
|
maxRetries := 0
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{
|
|
&amqp.ConnError{}, // this is normally a retryable error _but_ we disabled retries.
|
|
},
|
|
}
|
|
|
|
err := sendMessage(context.TODO(), getAmqpSender, maxRetries, nil, recover)
|
|
assert.EqualValues(t, &amqp.ConnError{}, err)
|
|
assert.EqualValues(t, maxRetries+1, sender.sendCount)
|
|
assert.EqualValues(t, recoverCalls, []recoveryCall{
|
|
{linkID: "sender-id", err: &amqp.ConnError{}, recover: true},
|
|
})
|
|
})
|
|
|
|
t.Run("SendRespectsContextStatus", func(*testing.T) {
|
|
maxRetries := 0
|
|
recoverCalls = nil
|
|
sender = &testAmqpSender{
|
|
sendErrors: []error{
|
|
&amqp.ConnError{}, // this is normally a retryable error _but_ we disabled retries.
|
|
},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
|
|
err := sendMessage(ctx, getAmqpSender, maxRetries, nil, recover)
|
|
assert.EqualValues(t, context.Canceled, err)
|
|
assert.EqualValues(t, 0, sender.sendCount)
|
|
assert.Empty(t, recoverCalls)
|
|
})
|
|
}
|
|
|
|
type FakeLocker struct {
|
|
afterBlock1 func()
|
|
mu *sync.Mutex
|
|
}
|
|
|
|
func (l FakeLocker) Lock() {
|
|
l.mu.Lock()
|
|
}
|
|
func (l FakeLocker) Unlock() {
|
|
l.afterBlock1()
|
|
l.mu.Unlock()
|
|
}
|
|
|
|
// TestRecoveryBlock1 tests recoverWithExpectedLinkID function's first "block" of code that
|
|
// decides if we are going to recover the link, ignore it, or wait for an in-progress recovery to
|
|
// complete.
|
|
func TestRecoveryBlock1(t *testing.T) {
|
|
t.Run("Empty link ID skips link ID checking and just does recovery", func(t *testing.T) {
|
|
cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
|
|
require.True(t, s.recovering)
|
|
})
|
|
|
|
defer cleanup()
|
|
|
|
err := sender.recoverWithExpectedLinkID(context.TODO(), "")
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("Matching link ID does recovery", func(t *testing.T) {
|
|
cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
|
|
require.True(t, s.recovering, "s.recovering should be true since the lock is available and we our expected link ID matches")
|
|
})
|
|
|
|
defer cleanup()
|
|
|
|
err := sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id")
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("Non-matching link ID skips recovery", func(t *testing.T) {
|
|
cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
|
|
require.False(t, s.recovering, "s.recovering should be false - the link ID isn't current, so nothing needs to be closed/recovered")
|
|
})
|
|
|
|
defer cleanup()
|
|
|
|
err := sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id")
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
// TODO: can't quite test this one
|
|
// t.Run("Already recovering, should wait for condition variable", func(t *testing.T) {
|
|
// cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
|
|
// })
|
|
|
|
// defer cleanup()
|
|
|
|
// sender.recovering = true // oops, someone else is already recovering
|
|
// sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id")
|
|
// })
|
|
}
|
|
|
|
func TestAMQPSenderIsCompatibleWithInterface(t *testing.T) {
|
|
var validateCompile amqpSender = &amqp.Sender{}
|
|
require.NotNil(t, validateCompile)
|
|
}
|
|
|
|
type fakeSender struct {
|
|
id string
|
|
closed bool
|
|
}
|
|
|
|
func (s *fakeSender) ID() string {
|
|
return s.id
|
|
}
|
|
|
|
func (s *fakeSender) LinkName() string {
|
|
return "the-actual-link-id"
|
|
}
|
|
|
|
func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
|
|
return nil
|
|
}
|
|
func (s *fakeSender) Close(ctx context.Context) error {
|
|
s.closed = true
|
|
return nil
|
|
}
|
|
|
|
func createRecoveryBlock1Sender(t *testing.T, afterBlock1 func(s *sender)) (func(), *sender) {
|
|
s := &sender{
|
|
partitionID: to.StringPtr("0"),
|
|
hub: &Hub{
|
|
namespace: &namespace{},
|
|
},
|
|
}
|
|
|
|
s.sender.Store(&fakeSender{
|
|
id: "the-actual-link-id",
|
|
})
|
|
|
|
s.cond = &sync.Cond{
|
|
L: FakeLocker{
|
|
mu: &sync.Mutex{},
|
|
afterBlock1: func() {
|
|
afterBlock1(s)
|
|
panic("Panicking to exit before block 2")
|
|
},
|
|
}}
|
|
|
|
return func() {
|
|
val := recover()
|
|
require.EqualValues(t, "Panicking to exit before block 2", val)
|
|
}, s
|
|
}
|