From 296ac2ca91f1bdf7d7e99c6acf9705a0718b857c Mon Sep 17 00:00:00 2001 From: Joel Hendrix Date: Wed, 17 Feb 2021 07:53:04 -0800 Subject: [PATCH] Add recovery mechanism to rpcClient (#206) * Add recovery mechanism to rpcClient RPC operations will now attempt to recover, similar to sender and receiver, in case of failure. Fixed Recover() to atomically rebuild the client. Close() will now close auth auto-refresh. * add transient error checks * don't recover on a closed connection use common.Retry() for Recover() retry loop added recovery tracing and debug logging * improve some doc comments * avoid potential for infinite loop --- rpc.go | 218 ++++++++++++++++++++++++++--------------------------- tracing.go | 10 +++ 2 files changed, 117 insertions(+), 111 deletions(-) diff --git a/rpc.go b/rpc.go index 21b1753..1758cfb 100644 --- a/rpc.go +++ b/rpc.go @@ -30,6 +30,7 @@ import ( "sync" "time" + common "github.com/Azure/azure-amqp-common-go/v3" "github.com/Azure/azure-amqp-common-go/v3/rpc" "github.com/Azure/azure-amqp-common-go/v3/uuid" "github.com/Azure/go-amqp" @@ -60,65 +61,132 @@ func newRPCClient(ctx context.Context, ec entityConnector, opts ...rpcClientOpti return nil, err } } - + if err := r.newClient(ctx); err != nil { + tab.For(ctx).Error(err) + return nil, err + } return r, nil } +// newClient will replace the existing client and start auth auto-refresh. +// any pre-existing client MUST be closed before calling this method. +// NOTE: this does *not* take the write lock, callers must hold it as required! +func (r *rpcClient) newClient(ctx context.Context) error { + var err error + r.client, err = r.ec.Namespace().newClient(ctx) + if err != nil { + return err + } + r.cancelAuthRefresh, err = r.ec.Namespace().negotiateClaim(ctx, r.client, r.ec.ManagementPath()) + if err != nil { + return err + } + return nil +} + // Recover will attempt to close the current session and link, then rebuild them func (r *rpcClient) Recover(ctx context.Context) error { ctx, span := r.startSpanFromContext(ctx, "sb.rpcClient.Recover") defer span.End() - - _ = r.Close() - return r.ensureConn(ctx) + // atomically close and rebuild the client + r.clientMu.Lock() + defer r.clientMu.Unlock() + _ = r.close() + if err := r.newClient(ctx); err != nil { + tab.For(ctx).Error(err) + return err + } + return nil } // Close will close the AMQP connection func (r *rpcClient) Close() error { r.clientMu.Lock() defer r.clientMu.Unlock() + return r.close() +} + +// closes the AMQP connection. callers *must* hold the client write lock before calling! +func (r *rpcClient) close() error { if r.cancelAuthRefresh != nil { <-r.cancelAuthRefresh() } - return r.client.Close() } -func (r *rpcClient) ensureConn(ctx context.Context) error { - ctx, span := r.startSpanFromContext(ctx, "sb.rpcClient.ensureConn") - defer span.End() - - if r.client != nil { - return nil +// creates a new link and sends the RPC request, recovering and retrying on certain AMQP errors +func (r *rpcClient) doRPCWithRetry(ctx context.Context, address string, msg *amqp.Message, times int, delay time.Duration, opts ...rpc.LinkOption) (*rpc.Response, error) { + // track the number of times we attempt to perform the RPC call. + // this is to avoid a potential infinite loop if the returned error + // is always transient and Recover() doesn't fail. + sendCount := 0 + for { + r.clientMu.RLock() + client := r.client + r.clientMu.RUnlock() + var link *rpc.Link + var rsp *rpc.Response + var err error + link, err = rpc.NewLink(client, address, opts...) + if err == nil { + rsp, err = link.RetryableRPC(ctx, times, delay, msg) + if err == nil { + return rsp, err + } + } + if sendCount >= amqpRetryDefaultTimes || !isAMQPTransientError(ctx, err) { + return nil, err + } + sendCount++ + // if we get here, recover and try again + tab.For(ctx).Debug("recovering RPC connection") + _, retryErr := common.Retry(amqpRetryDefaultTimes, amqpRetryDefaultDelay, func() (interface{}, error) { + ctx, sp := r.startProducerSpanFromContext(ctx, "sb.rpcClient.doRPCWithRetry.tryRecover") + defer sp.End() + if err := r.Recover(ctx); err == nil { + tab.For(ctx).Debug("recovered RPC connection") + return nil, nil + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + return nil, common.Retryable(err.Error()) + } + }) + if retryErr != nil { + tab.For(ctx).Debug("RPC recovering retried, but error was unrecoverable") + return nil, retryErr + } } +} - r.clientMu.Lock() - defer r.clientMu.Unlock() - - client, err := r.ec.Namespace().newClient(ctx) - r.cancelAuthRefresh, err = r.ec.Namespace().negotiateClaim(ctx, client, r.ec.ManagementPath()) - if err != nil { - tab.For(ctx).Error(err) - _ = client.Close() - return err +// returns true if the AMQP error is considered transient +func isAMQPTransientError(ctx context.Context, err error) bool { + // always retry on a detach error + var amqpDetach *amqp.DetachError + if errors.As(err, &amqpDetach) { + return true } - - r.client = client - return err + // for an AMQP error, only retry depending on the condition + var amqpErr *amqp.Error + if errors.As(err, &amqpErr) { + switch amqpErr.Condition { + case errorServerBusy, errorTimeout, errorOperationCancelled, errorContainerClose: + return true + default: + tab.For(ctx).Debug(fmt.Sprintf("isAMQPTransientError: condition %s is not transient", amqpErr.Condition)) + return false + } + } + tab.For(ctx).Debug(fmt.Sprintf("isAMQPTransientError: %T is not transient", err)) + return false } func (r *rpcClient) ReceiveDeferred(ctx context.Context, mode ReceiveMode, sequenceNumbers ...int64) ([]*Message, error) { ctx, span := startConsumerSpanFromContext(ctx, "sb.rpcClient.ReceiveDeferred") defer span.End() - if err := r.ensureConn(ctx); err != nil { - tab.For(ctx).Error(err) - return nil, err - } - - r.clientMu.RLock() - defer r.clientMu.RUnlock() - const messagesField, messageField = "messages", "message" backwardsMode := uint32(0) @@ -137,12 +205,6 @@ func (r *rpcClient) ReceiveDeferred(ctx context.Context, mode ReceiveMode, seque values["session-id"] = r.sessionID } - link, err := rpc.NewLink(r.client, r.ec.ManagementPath(), opts...) - if err != nil { - tab.For(ctx).Error(err) - return nil, err - } - msg := &amqp.Message{ ApplicationProperties: map[string]interface{}{ operationFieldName: "com.microsoft:receive-by-sequence-number", @@ -150,7 +212,7 @@ func (r *rpcClient) ReceiveDeferred(ctx context.Context, mode ReceiveMode, seque Value: values, } - rsp, err := link.RetryableRPC(ctx, 5, 5*time.Second, msg) + rsp, err := r.doRPCWithRetry(ctx, r.ec.ManagementPath(), msg, 5, 5*time.Second, opts...) if err != nil { tab.For(ctx).Error(err) return nil, err @@ -227,14 +289,6 @@ func (r *rpcClient) GetNextPage(ctx context.Context, fromSequenceNumber int64, m ctx, span := startConsumerSpanFromContext(ctx, "sb.rpcClient.GetNextPage") defer span.End() - if err := r.ensureConn(ctx); err != nil { - tab.For(ctx).Error(err) - return nil, err - } - - r.clientMu.RLock() - defer r.clientMu.RUnlock() - const messagesField, messageField = "messages", "message" msg := &amqp.Message{ @@ -251,13 +305,7 @@ func (r *rpcClient) GetNextPage(ctx context.Context, fromSequenceNumber int64, m msg.ApplicationProperties["server-timeout"] = uint(time.Until(deadline) / time.Millisecond) } - link, err := rpc.NewLink(r.client, r.ec.ManagementPath()) - if err != nil { - tab.For(ctx).Error(err) - return nil, err - } - - rsp, err := link.RetryableRPC(ctx, 5, 5*time.Second, msg) + rsp, err := r.doRPCWithRetry(ctx, r.ec.ManagementPath(), msg, 5, 5*time.Second) if err != nil { tab.For(ctx).Error(err) return nil, err @@ -348,14 +396,6 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error ctx, span := startConsumerSpanFromContext(ctx, "sb.RenewLocks") defer span.End() - if err := r.ensureConn(ctx); err != nil { - tab.For(ctx).Error(err) - return err - } - - r.clientMu.RLock() - defer r.clientMu.RUnlock() - lockTokens := make([]amqp.UUID, 0, len(messages)) for _, m := range messages { if m.LockToken == nil { @@ -381,13 +421,7 @@ func (r *rpcClient) RenewLocks(ctx context.Context, messages ...*Message) error }, } - rpcLink, err := rpc.NewLink(r.client, r.ec.ManagementPath()) - if err != nil { - tab.For(ctx).Error(err) - return err - } - - response, err := rpcLink.RetryableRPC(ctx, 3, 1*time.Second, renewRequestMsg) + response, err := r.doRPCWithRetry(ctx, r.ec.ManagementPath(), renewRequestMsg, 3, 1*time.Second) if err != nil { tab.For(ctx).Error(err) return err @@ -406,14 +440,6 @@ func (r *rpcClient) SendDisposition(ctx context.Context, m *Message, state dispo ctx, span := startConsumerSpanFromContext(ctx, "sb.rpcClient.SendDisposition") defer span.End() - if err := r.ensureConn(ctx); err != nil { - tab.For(ctx).Error(err) - return err - } - - r.clientMu.RLock() - defer r.clientMu.RUnlock() - if m.LockToken == nil { err := errors.New("lock token on the message is not set, thus cannot send disposition") tab.For(ctx).Error(err) @@ -446,14 +472,8 @@ func (r *rpcClient) SendDisposition(ctx context.Context, m *Message, state dispo Value: value, } - link, err := rpc.NewLink(r.client, m.ec.ManagementPath(), opts...) - if err != nil { - tab.For(ctx).Error(err) - return err - } - // no error, then it was successful - _, err = link.RetryableRPC(ctx, 5, 5*time.Second, msg) + _, err := r.doRPCWithRetry(ctx, m.ec.ManagementPath(), msg, 5, 5*time.Second, opts...) if err != nil { tab.For(ctx).Error(err) return err @@ -468,14 +488,6 @@ func (r *rpcClient) ScheduleAt(ctx context.Context, enqueueTime time.Time, messa ctx, span := startConsumerSpanFromContext(ctx, "sb.rpcClient.ScheduleAt") defer span.End() - if err := r.ensureConn(ctx); err != nil { - tab.For(ctx).Error(err) - return nil, err - } - - r.clientMu.RLock() - defer r.clientMu.RUnlock() - if len(messages) <= 0 { return nil, errors.New("expected one or more messages") } @@ -531,13 +543,9 @@ func (r *rpcClient) ScheduleAt(ctx context.Context, enqueueTime time.Time, messa msg.ApplicationProperties[serverTimeoutFieldName] = uint(time.Until(deadline) / time.Millisecond) } - link, err := rpc.NewLink(r.client, r.ec.ManagementPath()) - if err != nil { - return nil, err - } - - resp, err := link.RetryableRPC(ctx, 5, 5*time.Second, msg) + resp, err := r.doRPCWithRetry(ctx, r.ec.ManagementPath(), msg, 5, 5*time.Second) if err != nil { + tab.For(ctx).Error(err) return nil, err } @@ -568,14 +576,6 @@ func (r *rpcClient) CancelScheduled(ctx context.Context, seq ...int64) error { ctx, span := startConsumerSpanFromContext(ctx, "sb.rpcClient.CancelScheduled") defer span.End() - if err := r.ensureConn(ctx); err != nil { - tab.For(ctx).Error(err) - return err - } - - r.clientMu.RLock() - defer r.clientMu.RUnlock() - msg := &amqp.Message{ ApplicationProperties: map[string]interface{}{ operationFieldName: cancelScheduledOperationID, @@ -589,13 +589,9 @@ func (r *rpcClient) CancelScheduled(ctx context.Context, seq ...int64) error { msg.ApplicationProperties[serverTimeoutFieldName] = uint(time.Until(deadline) / time.Millisecond) } - link, err := rpc.NewLink(r.client, r.ec.ManagementPath()) - if err != nil { - return err - } - - resp, err := link.RetryableRPC(ctx, 5, 5*time.Second, msg) + resp, err := r.doRPCWithRetry(ctx, r.ec.ManagementPath(), msg, 5, 5*time.Second) if err != nil { + tab.For(ctx).Error(err) return err } diff --git a/tracing.go b/tracing.go index 841693b..f75ed3d 100644 --- a/tracing.go +++ b/tracing.go @@ -105,6 +105,16 @@ func (r *rpcClient) startSpanFromContext(ctx context.Context, operationName stri return ctx, span } +func (r *rpcClient) startProducerSpanFromContext(ctx context.Context, operationName string) (context.Context, tab.Spanner) { + ctx, span := tab.StartSpan(ctx, operationName) + applyComponentInfo(span) + span.AddAttributes( + tab.StringAttribute("span.kind", "producer"), + tab.StringAttribute("message_bus.destination", r.ec.ManagementPath()), + ) + return ctx, span +} + func startConsumerSpanFromContext(ctx context.Context, operationName string) (context.Context, tab.Spanner) { ctx, span := tab.StartSpan(ctx, operationName) applyComponentInfo(span)