From fe9b73659aba293ba8fef46256a6ac5405d669d2 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Sat, 31 Jul 2021 01:44:11 +0000 Subject: [PATCH 01/13] Recreating fix. Basically - if the link has already swapped out by the time Recover() runs we don't need to recover again. And if we do need to recover, we should try to just recreate the link first, and not the entire connection since that's more expensive. --- sender.go | 44 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/sender.go b/sender.go index 89a992c..219a861 100644 --- a/sender.go +++ b/sender.go @@ -67,7 +67,9 @@ type ( // amqpSender is the bare minimum we need from an AMQP based sender. // (used for testing) + // Implemented by *amqp.Sender amqpSender interface { + Id() string Send(ctx context.Context, msg *amqp.Message) error Close(ctx context.Context) error } @@ -122,12 +124,22 @@ func (s *sender) amqpSender() amqpSender { // it won't prevent excessive connection recovery. E.g. if a Recover() is in progress // and is in block 2, any additional calls to Recover() will wait at block 1 to // restart the recovery process once block 2 exits. -func (s *sender) Recover(ctx context.Context) error { +func (s *sender) Recover(ctx context.Context, currentLinkID string) error { span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.Recover") defer span.End() + recover := false + // acquire exclusive lock to see if this goroutine should recover s.cond.L.Lock() // block 1 + + // if the link they started with has already been closed and removed we don't + // need to trigger an additional recovery. + if s.getAmqpSender().Id() != currentLinkID { + s.cond.L.Unlock() + return nil + } + if !s.recovering { // another goroutine isn't recovering, so this one will tab.For(ctx).Debug("will recover connection") @@ -147,9 +159,20 @@ func (s *sender) Recover(ctx context.Context) error { defer cancel() // update shared state s.cond.L.Lock() // block 2 + + // another optimization - if we can, try to just recreate the session and link (and not the + // entire connection, which is way more expensive) _ = s.amqpSender().Close(closeCtx) _ = s.session.Close(closeCtx) - _ = s.connection.Close() + err = s.newSessionAndLink(ctx) + + if err != nil { + // less expensive recovery not possible, closing it all down instead. + _ = s.amqpSender().Close(closeCtx) + _ = s.session.Close(closeCtx) + _ = s.connection.Close() + } + err = s.newSessionAndLink(ctx) s.recovering = false s.cond.L.Unlock() @@ -238,7 +261,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { // create a per goroutine copy as Duration() and Reset() modify its state backoff := s.retryOptions.recoveryBackoff.Copy() - recvr := func(err error, recover bool) { + recvr := func(linkID string, err error, recover bool) { duration := backoff.Duration() tab.For(ctx).Debug("amqp error, delaying " + strconv.FormatInt(int64(duration/time.Millisecond), 10) + " millis: " + err.Error()) select { @@ -249,7 +272,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { return } if recover { - err = s.Recover(ctx) + err = s.Recover(ctx, linkID) if err != nil { tab.For(ctx).Debug("failed to recover connection") } else { @@ -265,7 +288,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { return sendMessage(ctx, s.amqpSender, s.retryOptions.maxRetries, msg, recvr) } -func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries int, msg *amqp.Message, recoverConnection func(err error, recover bool)) error { +func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries int, msg *amqp.Message, recoverConnection func(linkID string, err error, recover bool)) error { var lastError error // maxRetries >= 0 == finite retries @@ -275,7 +298,8 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in case <-ctx.Done(): return ctx.Err() default: - err := getAmqpSender().Send(ctx, msg) + sender := getAmqpSender() + err := sender.Send(ctx, msg) if err == nil { return err } @@ -285,18 +309,18 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in switch e := err.(type) { case *amqp.Error: if e.Condition == errorServerBusy || e.Condition == errorTimeout { - recoverConnection(err, false) + recoverConnection(sender.Id(), err, false) break } - recoverConnection(err, true) + recoverConnection(sender.Id(), err, true) case *amqp.DetachError, net.Error: - recoverConnection(err, true) + recoverConnection(sender.Id(), err, true) default: if !isRecoverableCloseError(err) { return err } - recoverConnection(err, true) + recoverConnection(sender.Id(), err, true) } } } From e6be745f5246f1c0c10bfb1376bbd2be231a74a0 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Sat, 31 Jul 2021 01:45:19 +0000 Subject: [PATCH 02/13] Fix compile error --- sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sender.go b/sender.go index 219a861..b37db90 100644 --- a/sender.go +++ b/sender.go @@ -135,7 +135,7 @@ func (s *sender) Recover(ctx context.Context, currentLinkID string) error { // if the link they started with has already been closed and removed we don't // need to trigger an additional recovery. - if s.getAmqpSender().Id() != currentLinkID { + if s.amqpSender().Id() != currentLinkID { s.cond.L.Unlock() return nil } From 72f6fc158b51bc1018a731ed3322de73d1b6770f Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Mon, 2 Aug 2021 21:11:46 +0000 Subject: [PATCH 03/13] Updating go-amqp dependency, doing `go mod tidy`. --- go.mod | 11 +++++------ go.sum | 34 ++++++++++++++++++++-------------- sender.go | 12 ++++++------ 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index 6360f59..32b1133 100644 --- a/go.mod +++ b/go.mod @@ -5,15 +5,14 @@ go 1.13 require ( github.com/Azure/azure-amqp-common-go/v3 v3.0.1 github.com/Azure/azure-pipeline-go v0.1.9 - github.com/Azure/azure-sdk-for-go v37.1.0+incompatible + github.com/Azure/azure-sdk-for-go v51.1.0+incompatible github.com/Azure/azure-storage-blob-go v0.6.0 - github.com/Azure/go-amqp v0.13.1 - github.com/Azure/go-autorest/autorest v0.11.3 - github.com/Azure/go-autorest/autorest/adal v0.9.0 + github.com/Azure/go-amqp v0.13.10 + github.com/Azure/go-autorest/autorest v0.11.18 + github.com/Azure/go-autorest/autorest/adal v0.9.13 github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 github.com/Azure/go-autorest/autorest/date v0.3.0 - github.com/Azure/go-autorest/autorest/to v0.3.0 - github.com/Azure/go-autorest/autorest/validation v0.2.0 // indirect + github.com/Azure/go-autorest/autorest/to v0.4.0 github.com/devigned/tab v0.1.1 github.com/joho/godotenv v1.3.0 github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 diff --git a/go.sum b/go.sum index a1fc962..8fc2837 100644 --- a/go.sum +++ b/go.sum @@ -3,24 +3,26 @@ github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGib github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg= github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= -github.com/Azure/azure-sdk-for-go v37.1.0+incompatible h1:aFlw3lP7ZHQi4m1kWCpcwYtczhDkGhDoRaMTaxcOf68= -github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w= +github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-storage-blob-go v0.6.0 h1:SEATKb3LIHcaSIX+E6/K4kJpwfuozFEsmt5rS56N6CE= github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs= -github.com/Azure/go-amqp v0.13.1 h1:dXnEJ89Hf7wMkcBbLqvocZlM4a3uiX9uCxJIvU77+Oo= -github.com/Azure/go-amqp v0.13.1/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs= +github.com/Azure/go-amqp v0.13.10 h1:+W1UMoJUFNwyzmslWxhxkM2VZjZprJ2tO2AOYPReeZo= +github.com/Azure/go-amqp v0.13.10/go.mod h1:D5ZrjQqB1dyp1A+G73xeL/kNn7D5qHJIIsNNps7YNmk= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0= -github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws= github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw= +github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM= +github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc= github.com/Azure/go-autorest/autorest/adal v0.8.1/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q= -github.com/Azure/go-autorest/autorest/adal v0.9.0 h1:SigMbuFNuKgc1xcGhaeapbh+8fgsu+GxgDRFyg7f5lM= github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg= +github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q= +github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 h1:iM6UAvjR97ZIeR93qTcwpKNMpV+/FTWjwEbuPD495Tk= github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM= github.com/Azure/go-autorest/autorest/azure/cli v0.3.1 h1:LXl088ZQlP0SBppGFsRZonW6hSvwgL5gRByMbvUbx8U= @@ -32,15 +34,17 @@ github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSY github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM= -github.com/Azure/go-autorest/autorest/mocks v0.4.0 h1:z20OWOSG5aCye0HEkDp6TPmP17ZcfeMxPi6HnSALa8c= github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= -github.com/Azure/go-autorest/autorest/to v0.3.0 h1:zebkZaadz7+wIQYgC7GXaz3Wb28yKYfVkkBKwc38VF8= -github.com/Azure/go-autorest/autorest/to v0.3.0/go.mod h1:MgwOyqaIuKdG4TL/2ywSsIWKAfJfgHDo8ObuUk3t5sA= -github.com/Azure/go-autorest/autorest/validation v0.2.0 h1:15vMO4y76dehZSq7pAaOLQxC6dZYsSrj2GQpflyM/L4= -github.com/Azure/go-autorest/autorest/validation v0.2.0/go.mod h1:3EEqHnBxQGHXRYq3HT1WyXAvT7LLY3tl70hw6tQIbjI= +github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk= +github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= +github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= +github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= +github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac= +github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= -github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE= github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= +github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg= +github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= @@ -49,10 +53,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA= github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4= github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8= +github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= +github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k= @@ -83,8 +88,9 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE= +golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= diff --git a/sender.go b/sender.go index b37db90..95f85f5 100644 --- a/sender.go +++ b/sender.go @@ -69,7 +69,7 @@ type ( // (used for testing) // Implemented by *amqp.Sender amqpSender interface { - Id() string + ID() string Send(ctx context.Context, msg *amqp.Message) error Close(ctx context.Context) error } @@ -135,7 +135,7 @@ func (s *sender) Recover(ctx context.Context, currentLinkID string) error { // if the link they started with has already been closed and removed we don't // need to trigger an additional recovery. - if s.amqpSender().Id() != currentLinkID { + if s.amqpSender().ID() != currentLinkID { s.cond.L.Unlock() return nil } @@ -309,18 +309,18 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in switch e := err.(type) { case *amqp.Error: if e.Condition == errorServerBusy || e.Condition == errorTimeout { - recoverConnection(sender.Id(), err, false) + recoverConnection(sender.ID(), err, false) break } - recoverConnection(sender.Id(), err, true) + recoverConnection(sender.ID(), err, true) case *amqp.DetachError, net.Error: - recoverConnection(sender.Id(), err, true) + recoverConnection(sender.ID(), err, true) default: if !isRecoverableCloseError(err) { return err } - recoverConnection(sender.Id(), err, true) + recoverConnection(sender.ID(), err, true) } } } From fa1b40845195d98ba0b805bd630b882247b87ea4 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 00:14:02 +0000 Subject: [PATCH 04/13] - Adding in a simple stress test that can run through the recovery code in a realistic way. - Adjusting tests to handle the link ID - Changing Recover() so I don't break the public facing API by breaking it apart into an internal version and a public facing one. (the internal one let's us pass in the expected link ID to avoid the expensive recovery) --- hub.go | 6 +- hub_test.go | 8 ++ internal/stress/stdoutTracer.go | 70 +++++++++++++++++ internal/stress/throttling/throttling.go | 95 ++++++++++++++++++++++++ sender.go | 41 +++++----- sender_test.go | 35 ++++++--- 6 files changed, 222 insertions(+), 33 deletions(-) create mode 100644 internal/stress/stdoutTracer.go create mode 100644 internal/stress/throttling/throttling.go diff --git a/hub.go b/hub.go index 7efbdde..5c8ae5f 100644 --- a/hub.go +++ b/hub.go @@ -779,17 +779,13 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) { } func isRecoverableCloseError(err error) bool { - return isConnectionClosed(err) || isSessionClosed(err) || isLinkClosed(err) + return isConnectionClosed(err) || isSessionClosed(err) || err == amqp.ErrLinkDetached } func isConnectionClosed(err error) bool { return err == amqp.ErrConnClosed } -func isLinkClosed(err error) bool { - return err == amqp.ErrLinkClosed -} - func isSessionClosed(err error) bool { return err == amqp.ErrSessionClosed } diff --git a/hub_test.go b/hub_test.go index d8fdc8f..68110d5 100644 --- a/hub_test.go +++ b/hub_test.go @@ -41,6 +41,7 @@ import ( "github.com/Azure/azure-amqp-common-go/v3/auth" "github.com/Azure/azure-amqp-common-go/v3/sas" "github.com/Azure/azure-amqp-common-go/v3/uuid" + "github.com/Azure/go-amqp" "github.com/Azure/go-autorest/autorest/azure" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -765,3 +766,10 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) { t.Fatalf("did not set appropriate endpoint suffix. Expected: %v, Received: %v", azure.PublicCloud.ServiceBusEndpointSuffix, h.namespace.host) } } + +func TestIsRecoverableCloseError(t *testing.T) { + require.True(t, isRecoverableCloseError(amqp.ErrLinkDetached)) + + // if the caller closes a link we shouldn't reopen or create a new one to replace it + require.False(t, isRecoverableCloseError(amqp.ErrLinkClosed)) +} diff --git a/internal/stress/stdoutTracer.go b/internal/stress/stdoutTracer.go new file mode 100644 index 0000000..777681a --- /dev/null +++ b/internal/stress/stdoutTracer.go @@ -0,0 +1,70 @@ +package stress + +import ( + "context" + "log" + + "github.com/devigned/tab" +) + +// StderrTracer is a wrapper around a NoOpTracer so we can add in a +// really simple stderr logger. Useful for seeing some of the internal +// state changes (like retries) that aren't normally customer visible. +type StderrTracer struct { + NoOpTracer *tab.NoOpTracer +} + +func (ft *StderrTracer) StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, tab.Spanner) { + return ft.NoOpTracer.StartSpan(ctx, operationName, opts...) +} + +func (ft *StderrTracer) StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier tab.Carrier, opts ...interface{}) (context.Context, tab.Spanner) { + return ft.NoOpTracer.StartSpanWithRemoteParent(ctx, operationName, carrier, opts...) +} + +func (ft *StderrTracer) FromContext(ctx context.Context) tab.Spanner { + return &stderrSpanner{ + spanner: ft.NoOpTracer.FromContext(ctx), + } +} + +func (ft *StderrTracer) NewContext(parent context.Context, span tab.Spanner) context.Context { + return ft.NoOpTracer.NewContext(parent, span) +} + +type stderrSpanner struct { + spanner tab.Spanner +} + +func (s *stderrSpanner) AddAttributes(attributes ...tab.Attribute) {} + +func (s *stderrSpanner) End() {} + +func (s *stderrSpanner) Logger() tab.Logger { + return &stderrLogger{} +} + +func (s *stderrSpanner) Inject(carrier tab.Carrier) error { + return nil +} + +func (s *stderrSpanner) InternalSpan() interface{} { + return s.spanner.InternalSpan() +} + +type stderrLogger struct{} + +func (l *stderrLogger) Info(msg string, attributes ...tab.Attribute) { + log.Printf("INFO: %s", msg) +} +func (l *stderrLogger) Error(err error, attributes ...tab.Attribute) { + log.Printf("ERROR: %s", err.Error()) +} + +func (l *stderrLogger) Fatal(msg string, attributes ...tab.Attribute) { + log.Printf("FATAL: %s", msg) +} + +func (l *stderrLogger) Debug(msg string, attributes ...tab.Attribute) { + log.Printf("DEBUG: %s", msg) +} diff --git a/internal/stress/throttling/throttling.go b/internal/stress/throttling/throttling.go new file mode 100644 index 0000000..47b91f4 --- /dev/null +++ b/internal/stress/throttling/throttling.go @@ -0,0 +1,95 @@ +package main + +import ( + "context" + "log" + "os" + "sync" + "sync/atomic" + "time" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-event-hubs-go/v3/internal/stress" + "github.com/devigned/tab" + "github.com/joho/godotenv" +) + +func main() { + godotenv.Load("../../../.env") + cs := os.Getenv("EVENTHUB_CONNECTION_STRING") + + hub, err := eventhub.NewHubFromConnectionString(cs) + + if err != nil { + log.Fatalf("Failed to create hub: %s", err.Error()) + } + + // Generate some large batches of messages and send them in parallel. + // The Go SDK is fast enough that this will cause a 1TU instance to throttle + // us, allowing you to see how our code reacts to it. + tab.Register(&stress.StderrTracer{NoOpTracer: &tab.NoOpTracer{}}) + lastExpectedId := sendMessages(hub) + + log.Printf("Sending complete, last expected ID = %d", lastExpectedId) +} + +func sendMessages(hub *eventhub.Hub) int64 { + var batches []eventhub.BatchIterator + nextTestId := int64(0) + + log.Printf("Creating event batches") + + for i := 0; i < 10; i++ { + batches = append(batches, createEventBatch(&nextTestId)) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + wg := &sync.WaitGroup{} + + log.Printf("Sending event batches") + + var totalBatches int64 = 0 + + for i, batch := range batches { + wg.Add(1) + + go func(idx int, batch eventhub.BatchIterator) { + err := hub.SendBatch(ctx, batch) + + if err != nil { + log.Fatalf("ERROR sending batch: %s", err.Error()) + } + + wg.Done() + atomic.AddInt64(&totalBatches, 1) + log.Printf("[%d/%d] sent...", totalBatches, len(batches)) + }(i, batch) + } + + wg.Wait() + + return nextTestId - 1 +} + +func createEventBatch(testId *int64) eventhub.BatchIterator { + var events []*eventhub.Event + var data = [1024]byte{1} + + // simple minimum + batchSize := 880 + + for i := 0; i < batchSize; i++ { + events = append(events, &eventhub.Event{ + Data: data[:], + Properties: map[string]interface{}{ + "testId": *testId, + }, + }) + + *testId++ + } + + return eventhub.NewEventBatchIterator(events...) +} diff --git a/sender.go b/sender.go index 95f85f5..962cfef 100644 --- a/sender.go +++ b/sender.go @@ -119,12 +119,18 @@ func (s *sender) amqpSender() amqpSender { return s.sender.Load().(*amqp.Sender) } -// Recover will attempt to close the current session and link, then rebuild them. -// Note that while the implementation will ensure that Recover() is goroutine safe -// it won't prevent excessive connection recovery. E.g. if a Recover() is in progress -// and is in block 2, any additional calls to Recover() will wait at block 1 to -// restart the recovery process once block 2 exits. -func (s *sender) Recover(ctx context.Context, currentLinkID string) error { +// Recover will attempt to close the current connectino, session and link, then rebuild them. +func (s *sender) Recover(ctx context.Context) error { + return s.recoverWithExpectedLinkId(ctx, "") +} + +// recoverWithExpectedLinkId attemps to recover the link as cheaply as possible. +// - It does not recover the link if expectedLinkID is non-empty and does NOT match +// the current link ID, as this would indicate that the previous bad link has +// already been closed and removed. +// - When recovering, it attempts to recover just the link first. If that fails then it +// will try to recover the entire connection. +func (s *sender) recoverWithExpectedLinkId(ctx context.Context, expectedLinkID string) error { span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.Recover") defer span.End() @@ -135,12 +141,9 @@ func (s *sender) Recover(ctx context.Context, currentLinkID string) error { // if the link they started with has already been closed and removed we don't // need to trigger an additional recovery. - if s.amqpSender().ID() != currentLinkID { - s.cond.L.Unlock() - return nil - } - - if !s.recovering { + if s.amqpSender().ID() != expectedLinkID { + tab.For(ctx).Debug("original linkID does not match, no recovery necessary") + } else if !s.recovering { // another goroutine isn't recovering, so this one will tab.For(ctx).Debug("will recover connection") s.recovering = true @@ -150,7 +153,9 @@ func (s *sender) Recover(ctx context.Context, currentLinkID string) error { tab.For(ctx).Debug("waiting for connection to recover") s.cond.Wait() } + s.cond.L.Unlock() + var err error if recover { tab.For(ctx).Debug("recovering connection") @@ -272,7 +277,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { return } if recover { - err = s.Recover(ctx, linkID) + err = s.recoverWithExpectedLinkId(ctx, linkID) if err != nil { tab.For(ctx).Debug("failed to recover connection") } else { @@ -288,7 +293,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { return sendMessage(ctx, s.amqpSender, s.retryOptions.maxRetries, msg, recvr) } -func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries int, msg *amqp.Message, recoverConnection func(linkID string, err error, recover bool)) error { +func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries int, msg *amqp.Message, recoverLink func(linkID string, err error, recover bool)) error { var lastError error // maxRetries >= 0 == finite retries @@ -309,18 +314,18 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in switch e := err.(type) { case *amqp.Error: if e.Condition == errorServerBusy || e.Condition == errorTimeout { - recoverConnection(sender.ID(), err, false) + recoverLink(sender.ID(), err, false) break } - recoverConnection(sender.ID(), err, true) + recoverLink(sender.ID(), err, true) case *amqp.DetachError, net.Error: - recoverConnection(sender.ID(), err, true) + recoverLink(sender.ID(), err, true) default: if !isRecoverableCloseError(err) { return err } - recoverConnection(sender.ID(), err, true) + recoverLink(sender.ID(), err, true) } } } diff --git a/sender_test.go b/sender_test.go index 8ee76a7..a51755a 100644 --- a/sender_test.go +++ b/sender_test.go @@ -17,10 +17,15 @@ type testAmqpSender struct { } type recoveryCall struct { + linkID string err error recover bool } +func (s *testAmqpSender) ID() string { + return "sender-id" +} + func (s *testAmqpSender) Send(ctx context.Context, msg *amqp.Message) error { var err error @@ -45,8 +50,8 @@ func TestSenderRetries(t *testing.T) { return sender } - recover := func(err error, recover bool) { - recoverCalls = append(recoverCalls, recoveryCall{err, recover}) + recover := func(linkID string, err error, recover bool) { + recoverCalls = append(recoverCalls, recoveryCall{linkID, err, recover}) } t.Run("SendSucceedsOnFirstTry", func(t *testing.T) { @@ -63,7 +68,7 @@ func TestSenderRetries(t *testing.T) { recoverCalls = nil sender = &testAmqpSender{ sendErrors: []error{ - amqp.ErrLinkClosed, + amqp.ErrLinkDetached, amqp.ErrSessionClosed, errors.New("We'll never attempt to use this one since we ran out of retries")}, } @@ -76,10 +81,12 @@ func TestSenderRetries(t *testing.T) { assert.EqualValues(t, 2, sender.sendCount) assert.EqualValues(t, []recoveryCall{ { - err: amqp.ErrLinkClosed, + linkID: "sender-id", + err: amqp.ErrLinkDetached, recover: true, }, { + linkID: "sender-id", err: amqp.ErrSessionClosed, recover: true, }, @@ -124,18 +131,21 @@ func TestSenderRetries(t *testing.T) { assert.EqualValues(t, 4, sender.sendCount) assert.EqualValues(t, []recoveryCall{ { + linkID: "sender-id", err: &amqp.Error{ Condition: errorServerBusy, }, recover: false, }, { + linkID: "sender-id", err: &amqp.Error{ Condition: errorTimeout, }, recover: false, }, { + linkID: "sender-id", err: &amqp.Error{ Condition: amqp.ErrorNotImplemented, }, @@ -158,10 +168,12 @@ func TestSenderRetries(t *testing.T) { assert.EqualValues(t, 3, sender.sendCount) assert.EqualValues(t, []recoveryCall{ { + linkID: "sender-id", err: &amqp.DetachError{}, recover: true, }, { + linkID: "sender-id", err: &net.DNSError{}, recover: true, }, @@ -173,7 +185,7 @@ func TestSenderRetries(t *testing.T) { sender = &testAmqpSender{ sendErrors: []error{ amqp.ErrConnClosed, - amqp.ErrLinkClosed, + amqp.ErrLinkDetached, amqp.ErrSessionClosed, }, } @@ -183,14 +195,17 @@ func TestSenderRetries(t *testing.T) { assert.EqualValues(t, 4, sender.sendCount) assert.EqualValues(t, []recoveryCall{ { + linkID: "sender-id", err: amqp.ErrConnClosed, recover: true, }, { - err: amqp.ErrLinkClosed, + linkID: "sender-id", + err: amqp.ErrLinkDetached, recover: true, }, { + linkID: "sender-id", err: amqp.ErrSessionClosed, recover: true, }, @@ -213,9 +228,9 @@ func TestSenderRetries(t *testing.T) { assert.NoError(t, err, "Last call succeeds") assert.EqualValues(t, 3+1, sender.sendCount) assert.EqualValues(t, recoverCalls, []recoveryCall{ - {err: amqp.ErrConnClosed, recover: true}, - {err: amqp.ErrConnClosed, recover: true}, - {err: amqp.ErrConnClosed, recover: true}, + {linkID: "sender-id", err: amqp.ErrConnClosed, recover: true}, + {linkID: "sender-id", err: amqp.ErrConnClosed, recover: true}, + {linkID: "sender-id", err: amqp.ErrConnClosed, recover: true}, }) }) @@ -232,7 +247,7 @@ func TestSenderRetries(t *testing.T) { assert.EqualValues(t, amqp.ErrConnClosed, err) assert.EqualValues(t, maxRetries+1, sender.sendCount) assert.EqualValues(t, recoverCalls, []recoveryCall{ - {err: amqp.ErrConnClosed, recover: true}, + {linkID: "sender-id", err: amqp.ErrConnClosed, recover: true}, }) }) From 26a1d8435e3369a541fab22461380070b313dcb0 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 00:16:51 +0000 Subject: [PATCH 05/13] ID() --- sender.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sender.go b/sender.go index 962cfef..b4d6e9e 100644 --- a/sender.go +++ b/sender.go @@ -121,16 +121,16 @@ func (s *sender) amqpSender() amqpSender { // Recover will attempt to close the current connectino, session and link, then rebuild them. func (s *sender) Recover(ctx context.Context) error { - return s.recoverWithExpectedLinkId(ctx, "") + return s.recoverWithExpectedLinkID(ctx, "") } -// recoverWithExpectedLinkId attemps to recover the link as cheaply as possible. +// recoverWithExpectedLinkID attemps to recover the link as cheaply as possible. // - It does not recover the link if expectedLinkID is non-empty and does NOT match // the current link ID, as this would indicate that the previous bad link has // already been closed and removed. // - When recovering, it attempts to recover just the link first. If that fails then it // will try to recover the entire connection. -func (s *sender) recoverWithExpectedLinkId(ctx context.Context, expectedLinkID string) error { +func (s *sender) recoverWithExpectedLinkID(ctx context.Context, expectedLinkID string) error { span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.Recover") defer span.End() @@ -277,7 +277,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { return } if recover { - err = s.recoverWithExpectedLinkId(ctx, linkID) + err = s.recoverWithExpectedLinkID(ctx, linkID) if err != nil { tab.For(ctx).Debug("failed to recover connection") } else { From 26a33e68d8a3a1ffd409f4b0e9228806c03af2a7 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 00:25:54 +0000 Subject: [PATCH 06/13] Update changelog with fix we just made. --- changelog.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/changelog.md b/changelog.md index fe887f3..7f9742a 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,8 @@ # Change Log +## `v3.3.12` +- Fix bug in sender.Recover() where recovery could get stuck when a link was throttled. [PR#232](#https://github.com/Azure/azure-event-hubs-go/pull/232) + ## `v3.3.11` - Allow for controlling the maximum retry count when sending messages. [#226](https://github.com/Azure/azure-event-hubs-go/issues/226) From 489579cbd9b06ccd1fcbdfd1f38eca543a3af993 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 01:20:28 +0000 Subject: [PATCH 07/13] If the expectedLinkID is empty we just skip the ID check altogether and fallback to the normal recovery checks. --- sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sender.go b/sender.go index b4d6e9e..ed24c14 100644 --- a/sender.go +++ b/sender.go @@ -141,7 +141,7 @@ func (s *sender) recoverWithExpectedLinkID(ctx context.Context, expectedLinkID s // if the link they started with has already been closed and removed we don't // need to trigger an additional recovery. - if s.amqpSender().ID() != expectedLinkID { + if expectedLinkID != "" && s.amqpSender().ID() != expectedLinkID { tab.For(ctx).Debug("original linkID does not match, no recovery necessary") } else if !s.recovering { // another goroutine isn't recovering, so this one will From ee14f14fcd7e989f5e0718f699ac09486bc9a029 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 01:30:11 +0000 Subject: [PATCH 08/13] Added test that shows ErrLinkClosed vs ErrLinkDetached --- sender_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sender_test.go b/sender_test.go index a51755a..ab0f3ad 100644 --- a/sender_test.go +++ b/sender_test.go @@ -110,6 +110,21 @@ func TestSenderRetries(t *testing.T) { assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors") }) + t.Run("SendIsNotRecoverableIfLinkIsClosed", func(*testing.T) { + recoverCalls = nil + sender = &testAmqpSender{ + sendErrors: []error{ + amqp.ErrLinkClosed, // this is no longer considered a retryable error (ErrLinkDetached is, however) + }, + } + + actualErr := sendMessage(context.TODO(), getAmqpSender, 5, nil, recover) + + assert.EqualValues(t, amqp.ErrLinkClosed, actualErr) + assert.EqualValues(t, 1, sender.sendCount) + assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors") + }) + t.Run("SendWithAmqpErrors", func(*testing.T) { recoverCalls = nil sender = &testAmqpSender{ From 0ce0412a8492390c327b09eef58959a148df834d Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 02:32:34 +0000 Subject: [PATCH 09/13] Removing the code that tries to minimally recreate the link. newSessionAndLink is a little too modest about it's capabilities - it also creates a new connection. This part wasn't the highest value for recovery compared to just skipping recovery altogether if the link had cycled, so for now I'll just remove it and replace it with a TODO. --- sender.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/sender.go b/sender.go index ed24c14..9b408ff 100644 --- a/sender.go +++ b/sender.go @@ -165,20 +165,14 @@ func (s *sender) recoverWithExpectedLinkID(ctx context.Context, expectedLinkID s // update shared state s.cond.L.Lock() // block 2 - // another optimization - if we can, try to just recreate the session and link (and not the - // entire connection, which is way more expensive) + // TODO: we should be able to recover more quickly if we don't close the connection + // to recover (and just attempt to recreate the link). newSessionAndLink, currently, + // creates a new connection so we'd need to change that. _ = s.amqpSender().Close(closeCtx) _ = s.session.Close(closeCtx) + _ = s.connection.Close() err = s.newSessionAndLink(ctx) - if err != nil { - // less expensive recovery not possible, closing it all down instead. - _ = s.amqpSender().Close(closeCtx) - _ = s.session.Close(closeCtx) - _ = s.connection.Close() - } - - err = s.newSessionAndLink(ctx) s.recovering = false s.cond.L.Unlock() // signal to waiters that recovery is complete @@ -348,7 +342,7 @@ func (s *sender) getFullIdentifier() string { return s.hub.namespace.getEntityAudience(s.getAddress()) } -// newSessionAndLink will replace the existing session and link +// newSessionAndLink will replace the existing connection, session and link func (s *sender) newSessionAndLink(ctx context.Context) error { span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.newSessionAndLink") defer span.End() From 74c90a77d56f9e3511926aed7a4657935e92ea18 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 06:22:58 +0000 Subject: [PATCH 10/13] Add in some tests for block 1 of the recovery code. Not perfect (maybe even repulsive) but they work and can support us for more changes later. --- sender.go | 5 ++- sender_test.go | 105 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 2 deletions(-) diff --git a/sender.go b/sender.go index 9b408ff..1675d0d 100644 --- a/sender.go +++ b/sender.go @@ -116,7 +116,8 @@ func (h *Hub) newSender(ctx context.Context, retryOptions *senderRetryOptions) ( } func (s *sender) amqpSender() amqpSender { - return s.sender.Load().(*amqp.Sender) + // in reality, an *amqp.Sender + return s.sender.Load().(amqpSender) } // Recover will attempt to close the current connectino, session and link, then rebuild them. @@ -125,7 +126,7 @@ func (s *sender) Recover(ctx context.Context) error { } // recoverWithExpectedLinkID attemps to recover the link as cheaply as possible. -// - It does not recover the link if expectedLinkID is non-empty and does NOT match +// - It does not recover the link if expectedLinkID is not "" and does NOT match // the current link ID, as this would indicate that the previous bad link has // already been closed and removed. // - When recovering, it attempts to recover just the link first. If that fails then it diff --git a/sender_test.go b/sender_test.go index ab0f3ad..ac7cd69 100644 --- a/sender_test.go +++ b/sender_test.go @@ -4,10 +4,13 @@ import ( "context" "errors" "net" + "sync" "testing" "github.com/Azure/go-amqp" + "github.com/Azure/go-autorest/autorest/to" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // conforms to amqpSender @@ -284,3 +287,105 @@ func TestSenderRetries(t *testing.T) { assert.Empty(t, recoverCalls) }) } + +type FakeLocker struct { + afterBlock1 func() + mu *sync.Mutex +} + +func (l FakeLocker) Lock() { + l.mu.Lock() +} +func (l FakeLocker) Unlock() { + l.afterBlock1() + l.mu.Unlock() +} + +// TestRecoveryBlock1 tests recoverWithExpectedLinkID function's first "block" of code that +// decides if we are going to recover the link, ignore it, or wait for an in-progress recovery to +// complete. +func TestRecoveryBlock1(t *testing.T) { + t.Run("Empty link ID skips link ID checking and just does recovery", func(t *testing.T) { + cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) { + require.True(t, s.recovering) + }) + + defer cleanup() + + sender.recoverWithExpectedLinkID(context.TODO(), "") + }) + + t.Run("Matching link ID does recovery", func(t *testing.T) { + cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) { + require.True(t, s.recovering, "s.recovering should be true since the lock is available and we have our expected link ID matches") + }) + + defer cleanup() + + sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id") + }) + + t.Run("Non-matching link ID skips recovery", func(t *testing.T) { + cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) { + require.False(t, s.recovering, "s.recovering should be false - the link ID isn't current, so nothing needs to be closed/recovered") + }) + + defer cleanup() + + sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id") + }) + + // TODO: can't quite test this one + // t.Run("Already recovering, should wait for condition variable", func(t *testing.T) { + // cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) { + // }) + + // defer cleanup() + + // sender.recovering = true // oops, someone else is already recovering + // sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id") + // }) +} + +type fakeSender struct { + id string + closed bool +} + +func (s *fakeSender) ID() string { + return s.id +} + +func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error { + return nil +} +func (s *fakeSender) Close(ctx context.Context) error { + s.closed = true + return nil +} + +func createRecoveryBlock1Sender(t *testing.T, afterBlock1 func(s *sender)) (func(), *sender) { + s := &sender{ + partitionID: to.StringPtr("0"), + hub: &Hub{ + namespace: &namespace{}, + }, + } + + s.sender.Store(&fakeSender{ + id: "the-actual-link-id", + }) + + s.cond = &sync.Cond{ + L: FakeLocker{ + mu: &sync.Mutex{}, + afterBlock1: func() { + afterBlock1(s) + panic("Panicking to exit before block 2") + }, + }} + + return func() { + require.EqualValues(t, recover(), "Panicking to exit before block 2") + }, s +} From b3e3b9b4fb5a6211d9f662afec5e7fa07825cea8 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 18:14:42 +0000 Subject: [PATCH 11/13] Comment no longer applies. Will need to revisit in the future. --- sender.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/sender.go b/sender.go index 1675d0d..9bb44d2 100644 --- a/sender.go +++ b/sender.go @@ -129,8 +129,6 @@ func (s *sender) Recover(ctx context.Context) error { // - It does not recover the link if expectedLinkID is not "" and does NOT match // the current link ID, as this would indicate that the previous bad link has // already been closed and removed. -// - When recovering, it attempts to recover just the link first. If that fails then it -// will try to recover the entire connection. func (s *sender) recoverWithExpectedLinkID(ctx context.Context, expectedLinkID string) error { span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.Recover") defer span.End() From c257820f3251ddbb2a3ceb55d632539264de3098 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 3 Aug 2021 18:17:03 +0000 Subject: [PATCH 12/13] Rename file to match what it does and add in comments for linting. --- internal/stress/{stdoutTracer.go => stderrTracer.go} | 4 ++++ 1 file changed, 4 insertions(+) rename internal/stress/{stdoutTracer.go => stderrTracer.go} (88%) diff --git a/internal/stress/stdoutTracer.go b/internal/stress/stderrTracer.go similarity index 88% rename from internal/stress/stdoutTracer.go rename to internal/stress/stderrTracer.go index 777681a..7a30d1e 100644 --- a/internal/stress/stdoutTracer.go +++ b/internal/stress/stderrTracer.go @@ -14,20 +14,24 @@ type StderrTracer struct { NoOpTracer *tab.NoOpTracer } +// StartSpan forwards to NoOpTracer.StartSpan. func (ft *StderrTracer) StartSpan(ctx context.Context, operationName string, opts ...interface{}) (context.Context, tab.Spanner) { return ft.NoOpTracer.StartSpan(ctx, operationName, opts...) } +// StartSpanWithRemoteParent forwards to NoOpTracer.StartSpanWithRemoteParent. func (ft *StderrTracer) StartSpanWithRemoteParent(ctx context.Context, operationName string, carrier tab.Carrier, opts ...interface{}) (context.Context, tab.Spanner) { return ft.NoOpTracer.StartSpanWithRemoteParent(ctx, operationName, carrier, opts...) } +// FromContext creates a stderrSpanner to allow for our stderrLogger to be created. func (ft *StderrTracer) FromContext(ctx context.Context) tab.Spanner { return &stderrSpanner{ spanner: ft.NoOpTracer.FromContext(ctx), } } +// NewContext forwards to NoOpTracer.NewContext func (ft *StderrTracer) NewContext(parent context.Context, span tab.Spanner) context.Context { return ft.NoOpTracer.NewContext(parent, span) }