* fix #149 add mutex RLock/RUnlock

* Recover should close and rebuild link atomically.

Return ErrConnectionClosed when attempting to send a message on a closed
connection.

* Take read lock when accessing s.sender

Consolidate code for creating ErrConnectionClosed

Co-authored-by: Joel Hendrix <jhendrix@microsoft.com>
This commit is contained in:
Mitsuo Heijo 2020-04-29 04:50:06 +09:00 коммит произвёл GitHub
Родитель 1839bfc2a4
Коммит 7f4325a886
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 44 добавлений и 6 удалений

Просмотреть файл

@ -35,6 +35,9 @@ type (
ErrNotFound struct {
EntityPath string
}
// ErrConnectionClosed indicates that the connection has been closed.
ErrConnectionClosed string
)
func (e ErrMissingField) Error() string {
@ -80,3 +83,7 @@ func IsErrNotFound(err error) bool {
_, ok := err.(ErrNotFound)
return ok
}
func (e ErrConnectionClosed) Error() string {
return fmt.Sprintf("the connection has been closed: %s", string(e))
}

Просмотреть файл

@ -24,13 +24,13 @@ package servicebus
import (
"context"
common "github.com/Azure/azure-amqp-common-go/v3"
"sync"
"time"
common "github.com/Azure/azure-amqp-common-go/v3"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
)
type (
@ -77,6 +77,7 @@ func (ns *Namespace) NewSender(ctx context.Context, entityPath string, opts ...S
}
}
// no need to take the write lock as we're creating a new Sender
err := s.newSessionAndLink(ctx)
if err != nil {
tab.For(ctx).Error(err)
@ -96,7 +97,10 @@ func (s *Sender) Recover(ctx context.Context) error {
closeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
closeCtx = tab.NewContext(closeCtx, span)
defer cancel()
_ = s.Close(ctx)
// we must close then rebuild the session/link atomically
s.clientMu.Lock()
defer s.clientMu.Unlock()
_ = s.close(ctx)
return s.newSessionAndLink(ctx)
}
@ -108,6 +112,11 @@ func (s *Sender) Close(ctx context.Context) error {
s.clientMu.Lock()
defer s.clientMu.Unlock()
return s.close(ctx)
}
// closes the connection. callers *must* hold the client write lock before calling!
func (s *Sender) close(ctx context.Context) error {
if s.doneRefreshingAuth != nil {
s.doneRefreshingAuth()
}
@ -148,8 +157,15 @@ func (s *Sender) Send(ctx context.Context, msg *Message, opts ...SendOption) err
defer span.End()
if msg.SessionID == nil {
s.clientMu.RLock()
if s.session == nil {
// another goroutine has closed the connection
s.clientMu.RUnlock()
return s.connClosedError(ctx)
}
msg.SessionID = &s.session.SessionID
next := s.session.getNext()
s.clientMu.RUnlock()
msg.GroupSequence = &next
}
@ -202,7 +218,14 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
return ctx.Err()
default:
// try as long as the context is not dead
s.clientMu.RLock()
if s.sender == nil {
// another goroutine has closed the connection
s.clientMu.RUnlock()
return s.connClosedError(ctx)
}
err = s.sender.Send(ctx, msg)
s.clientMu.RUnlock()
if err == nil {
// successful send
return err
@ -244,6 +267,16 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
}
}
func (s *Sender) connClosedError(ctx context.Context) error {
name := "Sender"
if s.Name != "" {
name = s.Name
}
err := ErrConnectionClosed(name)
tab.For(ctx).Error(err)
return err
}
func (s *Sender) String() string {
return s.Name
}
@ -257,13 +290,11 @@ func (s *Sender) getFullIdentifier() string {
}
// newSessionAndLink will replace the existing session and link
// NOTE: this does *not* take the write lock, callers must hold it as required!
func (s *Sender) newSessionAndLink(ctx context.Context) error {
ctx, span := s.startProducerSpanFromContext(ctx, "sb.Sender.newSessionAndLink")
defer span.End()
s.clientMu.Lock()
defer s.clientMu.Unlock()
connection, err := s.namespace.newClient()
if err != nil {
tab.For(ctx).Error(err)