Merge pull request #232 from Azure/recovery-changes

The recovery algorithm could get into a cycle where two goroutines could continue to recover, even when a new link was in place. This can cause a lot of thrash since each recovery of the link will interrupt any in-progress sends, causing even more recoveries, etc...

This PR fixes that by checking the link ID that we are trying to recycle. If it doesn't match then the link must already have been recovered so we can no-op and early return.
This commit is contained in:
Richard Park 2021-08-03 12:39:14 -07:00 коммит произвёл GitHub
Родитель f16940d69e 4a2d1e6e23
Коммит b794b15677
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 389 добавлений и 51 удалений

Просмотреть файл

@ -1,5 +1,8 @@
# Change Log
## `v3.3.12`
- Fix bug in sender.Recover() where recovery could get stuck when a link was throttled. [PR#232](#https://github.com/Azure/azure-event-hubs-go/pull/232)
## `v3.3.11`
- Allow for controlling the maximum retry count when sending messages. [#226](https://github.com/Azure/azure-event-hubs-go/issues/226)

11
go.mod
Просмотреть файл

@ -5,15 +5,14 @@ go 1.13
require (
github.com/Azure/azure-amqp-common-go/v3 v3.0.1
github.com/Azure/azure-pipeline-go v0.1.9
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
github.com/Azure/azure-storage-blob-go v0.6.0
github.com/Azure/go-amqp v0.13.1
github.com/Azure/go-autorest/autorest v0.11.3
github.com/Azure/go-autorest/autorest/adal v0.9.0
github.com/Azure/go-amqp v0.13.10
github.com/Azure/go-autorest/autorest v0.11.18
github.com/Azure/go-autorest/autorest/adal v0.9.13
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
github.com/Azure/go-autorest/autorest/date v0.3.0
github.com/Azure/go-autorest/autorest/to v0.3.0
github.com/Azure/go-autorest/autorest/validation v0.2.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0
github.com/devigned/tab v0.1.1
github.com/joho/godotenv v1.3.0
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7

34
go.sum
Просмотреть файл

@ -3,24 +3,26 @@ github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGib
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg=
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible h1:aFlw3lP7ZHQi4m1kWCpcwYtczhDkGhDoRaMTaxcOf68=
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.6.0 h1:SEATKb3LIHcaSIX+E6/K4kJpwfuozFEsmt5rS56N6CE=
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
github.com/Azure/go-amqp v0.13.1 h1:dXnEJ89Hf7wMkcBbLqvocZlM4a3uiX9uCxJIvU77+Oo=
github.com/Azure/go-amqp v0.13.1/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
github.com/Azure/go-amqp v0.13.10 h1:+W1UMoJUFNwyzmslWxhxkM2VZjZprJ2tO2AOYPReeZo=
github.com/Azure/go-amqp v0.13.10/go.mod h1:D5ZrjQqB1dyp1A+G73xeL/kNn7D5qHJIIsNNps7YNmk=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0=
github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws=
github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc=
github.com/Azure/go-autorest/autorest/adal v0.8.1/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
github.com/Azure/go-autorest/autorest/adal v0.9.0 h1:SigMbuFNuKgc1xcGhaeapbh+8fgsu+GxgDRFyg7f5lM=
github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 h1:iM6UAvjR97ZIeR93qTcwpKNMpV+/FTWjwEbuPD495Tk=
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM=
github.com/Azure/go-autorest/autorest/azure/cli v0.3.1 h1:LXl088ZQlP0SBppGFsRZonW6hSvwgL5gRByMbvUbx8U=
@ -32,15 +34,17 @@ github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSY
github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM=
github.com/Azure/go-autorest/autorest/mocks v0.4.0 h1:z20OWOSG5aCye0HEkDp6TPmP17ZcfeMxPi6HnSALa8c=
github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/to v0.3.0 h1:zebkZaadz7+wIQYgC7GXaz3Wb28yKYfVkkBKwc38VF8=
github.com/Azure/go-autorest/autorest/to v0.3.0/go.mod h1:MgwOyqaIuKdG4TL/2ywSsIWKAfJfgHDo8ObuUk3t5sA=
github.com/Azure/go-autorest/autorest/validation v0.2.0 h1:15vMO4y76dehZSq7pAaOLQxC6dZYsSrj2GQpflyM/L4=
github.com/Azure/go-autorest/autorest/validation v0.2.0/go.mod h1:3EEqHnBxQGHXRYq3HT1WyXAvT7LLY3tl70hw6tQIbjI=
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE=
github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
@ -49,10 +53,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4=
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
@ -83,8 +88,9 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=

6
hub.go
Просмотреть файл

@ -779,17 +779,13 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
}
func isRecoverableCloseError(err error) bool {
return isConnectionClosed(err) || isSessionClosed(err) || isLinkClosed(err)
return isConnectionClosed(err) || isSessionClosed(err) || err == amqp.ErrLinkDetached
}
func isConnectionClosed(err error) bool {
return err == amqp.ErrConnClosed
}
func isLinkClosed(err error) bool {
return err == amqp.ErrLinkClosed
}
func isSessionClosed(err error) bool {
return err == amqp.ErrSessionClosed
}

Просмотреть файл

@ -41,6 +41,7 @@ import (
"github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-amqp-common-go/v3/sas"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -765,3 +766,10 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) {
t.Fatalf("did not set appropriate endpoint suffix. Expected: %v, Received: %v", azure.PublicCloud.ServiceBusEndpointSuffix, h.namespace.host)
}
}
func TestIsRecoverableCloseError(t *testing.T) {
require.True(t, isRecoverableCloseError(amqp.ErrLinkDetached))
// if the caller closes a link we shouldn't reopen or create a new one to replace it
require.False(t, isRecoverableCloseError(amqp.ErrLinkClosed))
}

Просмотреть файл

@ -0,0 +1,74 @@
package stress
import (
"context"
"log"
"github.com/devigned/tab"
)
// StderrTracer is a wrapper around a NoOpTracer so we can add in a
// really simple stderr logger. Useful for seeing some of the internal
// state changes (like retries) that aren't normally customer visible.
type StderrTracer struct {
NoOpTracer *tab.NoOpTracer
}
// StartSpan forwards to NoOpTracer.StartSpan.
func (ft *StderrTracer) StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, tab.Spanner) {
return ft.NoOpTracer.StartSpan(ctx, operationName, opts...)
}
// StartSpanWithRemoteParent forwards to NoOpTracer.StartSpanWithRemoteParent.
func (ft *StderrTracer) StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier tab.Carrier, opts ...interface{}) (context.Context, tab.Spanner) {
return ft.NoOpTracer.StartSpanWithRemoteParent(ctx, operationName, carrier, opts...)
}
// FromContext creates a stderrSpanner to allow for our stderrLogger to be created.
func (ft *StderrTracer) FromContext(ctx context.Context) tab.Spanner {
return &stderrSpanner{
spanner: ft.NoOpTracer.FromContext(ctx),
}
}
// NewContext forwards to NoOpTracer.NewContext
func (ft *StderrTracer) NewContext(parent context.Context, span tab.Spanner) context.Context {
return ft.NoOpTracer.NewContext(parent, span)
}
type stderrSpanner struct {
spanner tab.Spanner
}
func (s *stderrSpanner) AddAttributes(attributes ...tab.Attribute) {}
func (s *stderrSpanner) End() {}
func (s *stderrSpanner) Logger() tab.Logger {
return &stderrLogger{}
}
func (s *stderrSpanner) Inject(carrier tab.Carrier) error {
return nil
}
func (s *stderrSpanner) InternalSpan() interface{} {
return s.spanner.InternalSpan()
}
type stderrLogger struct{}
func (l *stderrLogger) Info(msg string, attributes ...tab.Attribute) {
log.Printf("INFO: %s", msg)
}
func (l *stderrLogger) Error(err error, attributes ...tab.Attribute) {
log.Printf("ERROR: %s", err.Error())
}
func (l *stderrLogger) Fatal(msg string, attributes ...tab.Attribute) {
log.Printf("FATAL: %s", msg)
}
func (l *stderrLogger) Debug(msg string, attributes ...tab.Attribute) {
log.Printf("DEBUG: %s", msg)
}

Просмотреть файл

@ -0,0 +1,95 @@
package main
import (
"context"
"log"
"os"
"sync"
"sync/atomic"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/internal/stress"
"github.com/devigned/tab"
"github.com/joho/godotenv"
)
func main() {
godotenv.Load("../../../.env")
cs := os.Getenv("EVENTHUB_CONNECTION_STRING")
hub, err := eventhub.NewHubFromConnectionString(cs)
if err != nil {
log.Fatalf("Failed to create hub: %s", err.Error())
}
// Generate some large batches of messages and send them in parallel.
// The Go SDK is fast enough that this will cause a 1TU instance to throttle
// us, allowing you to see how our code reacts to it.
tab.Register(&stress.StderrTracer{NoOpTracer: &tab.NoOpTracer{}})
lastExpectedId := sendMessages(hub)
log.Printf("Sending complete, last expected ID = %d", lastExpectedId)
}
func sendMessages(hub *eventhub.Hub) int64 {
var batches []eventhub.BatchIterator
nextTestId := int64(0)
log.Printf("Creating event batches")
for i := 0; i < 10; i++ {
batches = append(batches, createEventBatch(&nextTestId))
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
wg := &sync.WaitGroup{}
log.Printf("Sending event batches")
var totalBatches int64 = 0
for i, batch := range batches {
wg.Add(1)
go func(idx int, batch eventhub.BatchIterator) {
err := hub.SendBatch(ctx, batch)
if err != nil {
log.Fatalf("ERROR sending batch: %s", err.Error())
}
wg.Done()
atomic.AddInt64(&totalBatches, 1)
log.Printf("[%d/%d] sent...", totalBatches, len(batches))
}(i, batch)
}
wg.Wait()
return nextTestId - 1
}
func createEventBatch(testId *int64) eventhub.BatchIterator {
var events []*eventhub.Event
var data = [1024]byte{1}
// simple minimum
batchSize := 880
for i := 0; i < batchSize; i++ {
events = append(events, &eventhub.Event{
Data: data[:],
Properties: map[string]interface{}{
"testId": *testId,
},
})
*testId++
}
return eventhub.NewEventBatchIterator(events...)
}

Просмотреть файл

@ -67,7 +67,9 @@ type (
// amqpSender is the bare minimum we need from an AMQP based sender.
// (used for testing)
// Implemented by *amqp.Sender
amqpSender interface {
ID() string
Send(ctx context.Context, msg *amqp.Message) error
Close(ctx context.Context) error
}
@ -114,21 +116,33 @@ func (h *Hub) newSender(ctx context.Context, retryOptions *senderRetryOptions) (
}
func (s *sender) amqpSender() amqpSender {
return s.sender.Load().(*amqp.Sender)
// in reality, an *amqp.Sender
return s.sender.Load().(amqpSender)
}
// Recover will attempt to close the current session and link, then rebuild them.
// Note that while the implementation will ensure that Recover() is goroutine safe
// it won't prevent excessive connection recovery. E.g. if a Recover() is in progress
// and is in block 2, any additional calls to Recover() will wait at block 1 to
// restart the recovery process once block 2 exits.
// Recover will attempt to close the current connectino, session and link, then rebuild them.
func (s *sender) Recover(ctx context.Context) error {
return s.recoverWithExpectedLinkID(ctx, "")
}
// recoverWithExpectedLinkID attemps to recover the link as cheaply as possible.
// - It does not recover the link if expectedLinkID is not "" and does NOT match
// the current link ID, as this would indicate that the previous bad link has
// already been closed and removed.
func (s *sender) recoverWithExpectedLinkID(ctx context.Context, expectedLinkID string) error {
span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.Recover")
defer span.End()
recover := false
// acquire exclusive lock to see if this goroutine should recover
s.cond.L.Lock() // block 1
if !s.recovering {
// if the link they started with has already been closed and removed we don't
// need to trigger an additional recovery.
if expectedLinkID != "" && s.amqpSender().ID() != expectedLinkID {
tab.For(ctx).Debug("original linkID does not match, no recovery necessary")
} else if !s.recovering {
// another goroutine isn't recovering, so this one will
tab.For(ctx).Debug("will recover connection")
s.recovering = true
@ -138,7 +152,9 @@ func (s *sender) Recover(ctx context.Context) error {
tab.For(ctx).Debug("waiting for connection to recover")
s.cond.Wait()
}
s.cond.L.Unlock()
var err error
if recover {
tab.For(ctx).Debug("recovering connection")
@ -147,10 +163,15 @@ func (s *sender) Recover(ctx context.Context) error {
defer cancel()
// update shared state
s.cond.L.Lock() // block 2
// TODO: we should be able to recover more quickly if we don't close the connection
// to recover (and just attempt to recreate the link). newSessionAndLink, currently,
// creates a new connection so we'd need to change that.
_ = s.amqpSender().Close(closeCtx)
_ = s.session.Close(closeCtx)
_ = s.connection.Close()
err = s.newSessionAndLink(ctx)
s.recovering = false
s.cond.L.Unlock()
// signal to waiters that recovery is complete
@ -238,7 +259,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
// create a per goroutine copy as Duration() and Reset() modify its state
backoff := s.retryOptions.recoveryBackoff.Copy()
recvr := func(err error, recover bool) {
recvr := func(linkID string, err error, recover bool) {
duration := backoff.Duration()
tab.For(ctx).Debug("amqp error, delaying " + strconv.FormatInt(int64(duration/time.Millisecond), 10) + " millis: " + err.Error())
select {
@ -249,7 +270,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
return
}
if recover {
err = s.Recover(ctx)
err = s.recoverWithExpectedLinkID(ctx, linkID)
if err != nil {
tab.For(ctx).Debug("failed to recover connection")
} else {
@ -265,7 +286,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
return sendMessage(ctx, s.amqpSender, s.retryOptions.maxRetries, msg, recvr)
}
func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries int, msg *amqp.Message, recoverConnection func(err error, recover bool)) error {
func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries int, msg *amqp.Message, recoverLink func(linkID string, err error, recover bool)) error {
var lastError error
// maxRetries >= 0 == finite retries
@ -275,7 +296,8 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in
case <-ctx.Done():
return ctx.Err()
default:
err := getAmqpSender().Send(ctx, msg)
sender := getAmqpSender()
err := sender.Send(ctx, msg)
if err == nil {
return err
}
@ -285,18 +307,18 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in
switch e := err.(type) {
case *amqp.Error:
if e.Condition == errorServerBusy || e.Condition == errorTimeout {
recoverConnection(err, false)
recoverLink(sender.ID(), err, false)
break
}
recoverConnection(err, true)
recoverLink(sender.ID(), err, true)
case *amqp.DetachError, net.Error:
recoverConnection(err, true)
recoverLink(sender.ID(), err, true)
default:
if !isRecoverableCloseError(err) {
return err
}
recoverConnection(err, true)
recoverLink(sender.ID(), err, true)
}
}
}
@ -319,7 +341,7 @@ func (s *sender) getFullIdentifier() string {
return s.hub.namespace.getEntityAudience(s.getAddress())
}
// newSessionAndLink will replace the existing session and link
// newSessionAndLink will replace the existing connection, session and link
func (s *sender) newSessionAndLink(ctx context.Context) error {
span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.newSessionAndLink")
defer span.End()

Просмотреть файл

@ -4,10 +4,13 @@ import (
"context"
"errors"
"net"
"sync"
"testing"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/to"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// conforms to amqpSender
@ -17,10 +20,15 @@ type testAmqpSender struct {
}
type recoveryCall struct {
linkID string
err error
recover bool
}
func (s *testAmqpSender) ID() string {
return "sender-id"
}
func (s *testAmqpSender) Send(ctx context.Context, msg *amqp.Message) error {
var err error
@ -45,8 +53,8 @@ func TestSenderRetries(t *testing.T) {
return sender
}
recover := func(err error, recover bool) {
recoverCalls = append(recoverCalls, recoveryCall{err, recover})
recover := func(linkID string, err error, recover bool) {
recoverCalls = append(recoverCalls, recoveryCall{linkID, err, recover})
}
t.Run("SendSucceedsOnFirstTry", func(t *testing.T) {
@ -63,7 +71,7 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrLinkClosed,
amqp.ErrLinkDetached,
amqp.ErrSessionClosed,
errors.New("We'll never attempt to use this one since we ran out of retries")},
}
@ -76,10 +84,12 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, 2, sender.sendCount)
assert.EqualValues(t, []recoveryCall{
{
err: amqp.ErrLinkClosed,
linkID: "sender-id",
err: amqp.ErrLinkDetached,
recover: true,
},
{
linkID: "sender-id",
err: amqp.ErrSessionClosed,
recover: true,
},
@ -103,6 +113,21 @@ func TestSenderRetries(t *testing.T) {
assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors")
})
t.Run("SendIsNotRecoverableIfLinkIsClosed", func(*testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrLinkClosed, // this is no longer considered a retryable error (ErrLinkDetached is, however)
},
}
actualErr := sendMessage(context.TODO(), getAmqpSender, 5, nil, recover)
assert.EqualValues(t, amqp.ErrLinkClosed, actualErr)
assert.EqualValues(t, 1, sender.sendCount)
assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors")
})
t.Run("SendWithAmqpErrors", func(*testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
@ -124,18 +149,21 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, 4, sender.sendCount)
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: &amqp.Error{
Condition: errorServerBusy,
},
recover: false,
},
{
linkID: "sender-id",
err: &amqp.Error{
Condition: errorTimeout,
},
recover: false,
},
{
linkID: "sender-id",
err: &amqp.Error{
Condition: amqp.ErrorNotImplemented,
},
@ -158,10 +186,12 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, 3, sender.sendCount)
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: &amqp.DetachError{},
recover: true,
},
{
linkID: "sender-id",
err: &net.DNSError{},
recover: true,
},
@ -173,7 +203,7 @@ func TestSenderRetries(t *testing.T) {
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrConnClosed,
amqp.ErrLinkClosed,
amqp.ErrLinkDetached,
amqp.ErrSessionClosed,
},
}
@ -183,14 +213,17 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, 4, sender.sendCount)
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: amqp.ErrConnClosed,
recover: true,
},
{
err: amqp.ErrLinkClosed,
linkID: "sender-id",
err: amqp.ErrLinkDetached,
recover: true,
},
{
linkID: "sender-id",
err: amqp.ErrSessionClosed,
recover: true,
},
@ -213,9 +246,9 @@ func TestSenderRetries(t *testing.T) {
assert.NoError(t, err, "Last call succeeds")
assert.EqualValues(t, 3+1, sender.sendCount)
assert.EqualValues(t, recoverCalls, []recoveryCall{
{err: amqp.ErrConnClosed, recover: true},
{err: amqp.ErrConnClosed, recover: true},
{err: amqp.ErrConnClosed, recover: true},
{linkID: "sender-id", err: amqp.ErrConnClosed, recover: true},
{linkID: "sender-id", err: amqp.ErrConnClosed, recover: true},
{linkID: "sender-id", err: amqp.ErrConnClosed, recover: true},
})
})
@ -232,7 +265,7 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, amqp.ErrConnClosed, err)
assert.EqualValues(t, maxRetries+1, sender.sendCount)
assert.EqualValues(t, recoverCalls, []recoveryCall{
{err: amqp.ErrConnClosed, recover: true},
{linkID: "sender-id", err: amqp.ErrConnClosed, recover: true},
})
})
@ -254,3 +287,105 @@ func TestSenderRetries(t *testing.T) {
assert.Empty(t, recoverCalls)
})
}
type FakeLocker struct {
afterBlock1 func()
mu *sync.Mutex
}
func (l FakeLocker) Lock() {
l.mu.Lock()
}
func (l FakeLocker) Unlock() {
l.afterBlock1()
l.mu.Unlock()
}
// TestRecoveryBlock1 tests recoverWithExpectedLinkID function's first "block" of code that
// decides if we are going to recover the link, ignore it, or wait for an in-progress recovery to
// complete.
func TestRecoveryBlock1(t *testing.T) {
t.Run("Empty link ID skips link ID checking and just does recovery", func(t *testing.T) {
cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
require.True(t, s.recovering)
})
defer cleanup()
sender.recoverWithExpectedLinkID(context.TODO(), "")
})
t.Run("Matching link ID does recovery", func(t *testing.T) {
cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
require.True(t, s.recovering, "s.recovering should be true since the lock is available and we have our expected link ID matches")
})
defer cleanup()
sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id")
})
t.Run("Non-matching link ID skips recovery", func(t *testing.T) {
cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
require.False(t, s.recovering, "s.recovering should be false - the link ID isn't current, so nothing needs to be closed/recovered")
})
defer cleanup()
sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id")
})
// TODO: can't quite test this one
// t.Run("Already recovering, should wait for condition variable", func(t *testing.T) {
// cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
// })
// defer cleanup()
// sender.recovering = true // oops, someone else is already recovering
// sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id")
// })
}
type fakeSender struct {
id string
closed bool
}
func (s *fakeSender) ID() string {
return s.id
}
func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error {
return nil
}
func (s *fakeSender) Close(ctx context.Context) error {
s.closed = true
return nil
}
func createRecoveryBlock1Sender(t *testing.T, afterBlock1 func(s *sender)) (func(), *sender) {
s := &sender{
partitionID: to.StringPtr("0"),
hub: &Hub{
namespace: &namespace{},
},
}
s.sender.Store(&fakeSender{
id: "the-actual-link-id",
})
s.cond = &sync.Cond{
L: FakeLocker{
mu: &sync.Mutex{},
afterBlock1: func() {
afterBlock1(s)
panic("Panicking to exit before block 2")
},
}}
return func() {
require.EqualValues(t, recover(), "Panicking to exit before block 2")
}, s
}