Fix potential hang in Sender.Send() (#306)

* Fix potential hang in Sender.Send()

There were two observers of env.Sent, the session mux and the sender.
If the sender was the first to read from the channel, this would cause
the session mux to be blocked.
In order to support multiple observers, the channels must be treated as
semaphores which are signaled when closed.
Don't terminate a session's mux if a transfer wasn't sent due to
context cancellation/timeout (this is not terminal).

* simplify

* improve naming
This commit is contained in:
Joel Hendrix 2023-08-29 14:20:27 -07:00 коммит произвёл GitHub
Родитель a74ebbb50a
Коммит 766ec611cf
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 158 добавлений и 91 удалений

Просмотреть файл

@ -5,6 +5,7 @@
### Bugs Fixed
* Fixed an issue that could cause frames to be sent even when the provided `context.Context` was cancelled.
* Fixed a potential hang in `Sender.Send()` that could happen in rare circumstances.
## 1.0.1 (2023-06-08)

55
conn.go
Просмотреть файл

@ -712,14 +712,26 @@ func (c *Conn) readFrame() (frames.Frame, error) {
}
}
// frameContext is an extended context.Context used to track writes to the network.
// this is required in order to remove ambiguities that can arise when simply waiting
// on context.Context.Done() to be signaled.
type frameContext struct {
// Ctx contains the caller's context and is used to set the write deadline.
Ctx context.Context
// Done is closed when the frame was successfully written to net.Conn or Ctx was cancelled/timed out.
// Can be nil, but shouldn't be for callers that care about confirmation of sending.
Done chan struct{}
// Err contains the context error. MUST be set before closing Done and ONLY read if Done is closed.
// ONLY Conn.connWriter may write to this field.
Err error
}
// frameEnvelope is used when sending a frame to connWriter to be written to net.Conn
type frameEnvelope struct {
Ctx context.Context
Frame frames.Frame
// optional channel that is closed on successful write to net.Conn or contains the write error
// NOTE: use a buffered channel of size 1 when populating
Sent chan error
FrameCtx *frameContext
Frame frames.Frame
}
func (c *Conn) connWriter() {
@ -754,24 +766,24 @@ func (c *Conn) connWriter() {
select {
// frame write request
case env := <-c.txFrame:
timeout, ctxErr := c.getWriteTimeout(env.Ctx)
timeout, ctxErr := c.getWriteTimeout(env.FrameCtx.Ctx)
if ctxErr != nil {
debug.Log(1, "TX (connWriter %p) getWriteTimeout: %s: %s", c, ctxErr.Error(), env.Frame)
if env.Sent != nil {
env.Sent <- ctxErr
if env.FrameCtx.Done != nil {
// the error MUST be set before closing the channel
env.FrameCtx.Err = ctxErr
close(env.FrameCtx.Done)
}
continue
}
debug.Log(0, "TX (connWriter %p) timeout %s: %s", c, timeout, env.Frame)
err = c.writeFrame(timeout, env.Frame)
if env.Sent != nil {
if err == nil {
close(env.Sent)
} else {
env.Sent <- err
}
if err == nil && env.FrameCtx.Done != nil {
close(env.FrameCtx.Done)
}
// in the event of write failure, Conn will close and a
// *ConnError will be propagated to all of the sessions/link.
// keepalive timer
case <-keepalive:
@ -852,17 +864,12 @@ func (c *Conn) writeProtoHeader(pID protoID) error {
var keepaliveFrame = []byte{0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}
// SendFrame is used by sessions and links to send frames across the network.
// - ctx is used to provide the write deadline
// - fr is the frame to write to net.Conn
// - sent is the optional channel that will contain the error if the write fails
func (c *Conn) sendFrame(ctx context.Context, fr frames.Frame, sent chan error) {
func (c *Conn) sendFrame(frameEnv frameEnvelope) {
select {
case c.txFrame <- frameEnvelope{Ctx: ctx, Frame: fr, Sent: sent}:
debug.Log(2, "TX (Conn %p): mux frame to connWriter: %s", c, fr)
case c.txFrame <- frameEnv:
debug.Log(2, "TX (Conn %p): mux frame to connWriter: %s", c, frameEnv.Frame)
case <-c.done:
if sent != nil {
sent <- c.doneErr
}
// Conn has closed
}
}

28
link.go
Просмотреть файл

@ -277,7 +277,7 @@ func (l *link) muxHandleFrame(fr frames.FrameBody) error {
Handle: l.outputHandle,
Closed: true,
}
l.txFrame(context.Background(), dr, nil)
l.txFrame(&frameContext{Ctx: context.Background()}, dr)
return &LinkError{RemoteErr: fr.Error}
default:
@ -341,25 +341,22 @@ func (l *link) closeWithError(cnd ErrCond, desc string) {
}
l.closeInProgress = true
l.doneErr = &LinkError{inner: fmt.Errorf("%s: %s", cnd, desc)}
l.txFrame(context.Background(), dr, nil)
l.txFrame(&frameContext{Ctx: context.Background()}, dr)
}
// txFrame sends the specified frame via the link's session.
// you MUST call this instead of session.txFrame() to ensure
// that frames are not sent during session shutdown.
func (l *link) txFrame(ctx context.Context, fr frames.FrameBody, sent chan error) {
func (l *link) txFrame(frameCtx *frameContext, fr frames.FrameBody) {
// NOTE: there is no need to select on l.done as this is either
// called from a link's mux or before the mux has even started.
select {
case <-l.session.done:
if sent != nil {
sent <- l.session.doneErr
}
// the link's session has terminated, let that propagate to the link's mux
case <-l.session.endSent:
// we swallow this to prevent the link's mux from terminating.
// l.session.done will soon close so this is temporary.
return
case l.session.tx <- frameBodyEnvelope{Ctx: ctx, FrameBody: fr, Sent: sent}:
case l.session.tx <- frameBodyEnvelope{FrameCtx: frameCtx, FrameBody: fr}:
debug.Log(2, "TX (link %p): mux frame to Session (%p): %s", l, l.session, fr)
}
}
@ -368,9 +365,14 @@ func (l *link) txFrame(ctx context.Context, fr frames.FrameBody, sent chan error
// you MUST call this instead of session.txFrame() to ensure
// that frames are not sent during session shutdown.
func (l *link) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error {
frameCtx := frameContext{
Ctx: ctx,
Done: make(chan struct{}),
}
// NOTE: there is no need to select on l.done as this is either
// called from a link's mux or before the mux has even started.
sent := make(chan error, 1)
select {
case <-l.session.done:
return l.session.doneErr
@ -378,15 +380,13 @@ func (l *link) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error {
// we swallow this to prevent the link's mux from terminating.
// l.session.done will soon close so this is temporary.
return nil
case l.session.tx <- frameBodyEnvelope{Ctx: ctx, FrameBody: fr, Sent: sent}:
case l.session.tx <- frameBodyEnvelope{FrameCtx: &frameCtx, FrameBody: fr}:
debug.Log(2, "TX (link %p): mux frame to Session (%p): %s", l, l.session, fr)
}
select {
case err := <-sent:
return err
case <-l.done:
return l.doneErr
case <-frameCtx.Done:
return frameCtx.Err
case <-l.session.done:
return l.session.doneErr
}

Просмотреть файл

@ -249,17 +249,21 @@ func (r *Receiver) sendDisposition(ctx context.Context, first uint32, last *uint
State: state,
}
sent := make(chan error, 1)
frameCtx := frameContext{
Ctx: ctx,
Done: make(chan struct{}),
}
select {
case r.txDisposition <- frameBodyEnvelope{Ctx: ctx, FrameBody: fr, Sent: sent}:
case r.txDisposition <- frameBodyEnvelope{FrameCtx: &frameCtx, FrameBody: fr}:
debug.Log(2, "TX (Receiver %p): mux txDisposition %s", r, fr)
case <-r.l.done:
return r.l.doneErr
}
select {
case err := <-sent:
return err
case <-frameCtx.Done:
return frameCtx.Err
case <-r.l.done:
return r.l.doneErr
}
@ -581,7 +585,7 @@ func (r *Receiver) mux(hooks receiverTestHooks) {
}
case env := <-txDisposition:
r.l.txFrame(env.Ctx, env.FrameBody, env.Sent)
r.l.txFrame(env.FrameCtx, env.FrameBody)
case <-r.receiverReady:
continue
@ -598,7 +602,7 @@ func (r *Receiver) mux(hooks receiverTestHooks) {
Handle: r.l.outputHandle,
Closed: true,
}
r.l.txFrame(context.Background(), fr, nil)
r.l.txFrame(&frameContext{Ctx: context.Background()}, fr)
case <-r.l.session.done:
r.l.doneErr = r.l.session.doneErr
@ -633,7 +637,7 @@ func (r *Receiver) muxFlow(linkCredit uint32, drain bool) error {
}
select {
case r.l.session.tx <- frameBodyEnvelope{Ctx: context.Background(), FrameBody: fr}:
case r.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: fr}:
debug.Log(2, "TX (Receiver %p): mux frame to Session (%p): %d, %s", r, r.l.session, r.l.session.channel, fr)
return nil
case <-r.l.close:
@ -677,7 +681,7 @@ func (r *Receiver) muxHandleFrame(fr frames.FrameBody) error {
}
select {
case r.l.session.tx <- frameBodyEnvelope{Ctx: context.Background(), FrameBody: resp}:
case r.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: resp}:
debug.Log(2, "TX (Receiver %p): mux frame to Session (%p): %d, %s", r, r.l.session, r.l.session.channel, resp)
case <-r.l.close:
return nil

Просмотреть файл

@ -168,9 +168,13 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
// NOTE: we MUST send a copy of fr here since we modify it post send
sent := make(chan error, 1)
frameCtx := frameContext{
Ctx: ctx,
Done: make(chan struct{}),
}
select {
case s.transfers <- transferEnvelope{Ctx: ctx, InputHandle: s.l.inputHandle, Frame: fr, Sent: sent}:
case s.transfers <- transferEnvelope{FrameCtx: &frameCtx, InputHandle: s.l.inputHandle, Frame: fr}:
// frame was sent to our mux
case <-s.l.done:
return nil, s.l.doneErr
@ -179,10 +183,11 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
}
select {
case err := <-sent:
if err != nil {
return nil, err
case <-frameCtx.Done:
if frameCtx.Err != nil {
return nil, frameCtx.Err
}
// frame was written to the network
case <-s.l.done:
return nil, s.l.doneErr
}
@ -392,7 +397,7 @@ Loop:
Handle: s.l.outputHandle,
Closed: true,
}
s.l.txFrame(context.Background(), fr, nil)
s.l.txFrame(&frameContext{Ctx: context.Background()}, fr)
case <-s.l.session.done:
s.l.doneErr = s.l.session.doneErr
@ -437,7 +442,7 @@ func (s *Sender) muxHandleFrame(fr frames.FrameBody) error {
}
select {
case s.l.session.tx <- frameBodyEnvelope{Ctx: context.Background(), FrameBody: resp}:
case s.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: resp}:
debug.Log(2, "TX (Sender %p): mux frame to Session (%p): %d, %s", s, s.l.session, s.l.session.channel, resp)
case <-s.l.close:
return nil
@ -461,7 +466,7 @@ func (s *Sender) muxHandleFrame(fr frames.FrameBody) error {
}
select {
case s.l.session.tx <- frameBodyEnvelope{Ctx: context.Background(), FrameBody: dr}:
case s.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: dr}:
debug.Log(2, "TX (Sender %p): mux frame to Session (%p): %d, %s", s, s.l.session, s.l.session.channel, dr)
case <-s.l.close:
return nil

Просмотреть файл

@ -1260,3 +1260,52 @@ func TestSenderUnexpectedFrame(t *testing.T) {
require.ErrorContains(t, err, "unexpected frame *frames.PerformTransfer")
require.NoError(t, client.Close())
}
func TestSenderSendFails(t *testing.T) {
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
resp, err := senderFrameHandler(0, SenderSettleModeUnsettled)(remoteChannel, req)
if err != nil || resp.Payload != nil {
return resp, err
}
switch req.(type) {
case *frames.PerformTransfer:
return fake.Response{}, errors.New("send failed")
default:
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
cancel()
require.NoError(t, err)
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx, nil)
cancel()
require.NoError(t, err)
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
snd, err := session.NewSender(ctx, "target", nil)
cancel()
require.NoError(t, err)
const linkCredit = 100
sendInitialFlowFrame(t, 0, netConn, 0, linkCredit)
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
msg := NewMessage([]byte("test"))
connErr := &ConnError{}
require.ErrorAs(t, snd.Send(ctx, msg, nil), &connErr)
cancel()
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
require.ErrorAs(t, session.Close(ctx), &connErr)
cancel()
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
require.ErrorAs(t, snd.Close(ctx), &connErr)
cancel()
require.ErrorAs(t, client.Close(), &connErr)
}

Просмотреть файл

@ -211,26 +211,32 @@ func (s *Session) Close(ctx context.Context) error {
// txFrame sends a frame to the connWriter.
// - ctx is used to provide the write deadline
// - fr is the frame to write to net.Conn
// - sent is the optional channel that will contain the error if the write fails
func (s *Session) txFrame(ctx context.Context, fr frames.FrameBody, sent chan error) {
func (s *Session) txFrame(frameCtx *frameContext, fr frames.FrameBody) {
debug.Log(2, "TX (Session %p) mux frame to Conn (%p): %s", s, s.conn, fr)
s.conn.sendFrame(ctx, frames.Frame{
Type: frames.TypeAMQP,
Channel: s.channel,
Body: fr,
}, sent)
s.conn.sendFrame(frameEnvelope{
FrameCtx: frameCtx,
Frame: frames.Frame{
Type: frames.TypeAMQP,
Channel: s.channel,
Body: fr,
},
})
}
// txFrameAndWait sends a frame to the connWriter and waits for the write to complete
// - ctx is used to provide the write deadline
// - fr is the frame to write to net.Conn
func (s *Session) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error {
sent := make(chan error, 1)
s.txFrame(ctx, fr, sent)
frameCtx := frameContext{
Ctx: ctx,
Done: make(chan struct{}),
}
s.txFrame(&frameCtx, fr)
select {
case err := <-sent:
return err
case <-frameCtx.Done:
return frameCtx.Err
case <-s.conn.done:
return s.conn.doneErr
case <-s.done:
@ -345,7 +351,7 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
closeInProgress = true
s.doneErr = e2
s.txFrame(context.Background(), &frames.PerformEnd{Error: e1}, nil)
s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{Error: e1})
close(s.endSent)
}
@ -391,7 +397,7 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
}
// session is being closed by the client
closeInProgress = true
s.txFrame(context.Background(), &frames.PerformEnd{}, nil)
s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{})
close(s.endSent)
// incoming frame
@ -506,7 +512,7 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
s.txFrame(context.Background(), resp, nil)
s.txFrame(&frameContext{Ctx: context.Background()}, resp)
}
case *frames.PerformAttach:
@ -575,7 +581,7 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
s.txFrame(context.Background(), flow, nil)
s.txFrame(&frameContext{Ctx: context.Background()}, flow)
}
case *frames.PerformDetach:
@ -613,7 +619,7 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
}
fr := frames.PerformEnd{}
s.txFrame(context.Background(), &fr, nil)
s.txFrame(&frameContext{Ctx: context.Background()}, &fr)
// per spec, when end is received, we're no longer allowed to receive frames
return
@ -654,13 +660,18 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
delete(inputHandleFromDeliveryID, deliveryID)
}
s.txFrame(env.Ctx, fr, env.Sent)
if sendErr := <-env.Sent; sendErr != nil {
s.doneErr = sendErr
s.txFrame(env.FrameCtx, fr)
// put the error back as our sender will read from this channel
env.Sent <- sendErr
return
select {
case <-env.FrameCtx.Done:
if env.FrameCtx.Err != nil {
// transfer wasn't sent, don't update state
continue
}
// transfer was written to the network
case <-s.conn.done:
// the write failed, Conn is going down
continue
}
// if not settled, add done chan to map
@ -707,18 +718,18 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
}
}
}
s.txFrame(env.Ctx, fr, env.Sent)
s.txFrame(env.FrameCtx, fr)
case *frames.PerformFlow:
niID := nextIncomingID
fr.NextIncomingID = &niID
fr.IncomingWindow = s.incomingWindow
fr.NextOutgoingID = nextOutgoingID
fr.OutgoingWindow = s.outgoingWindow
s.txFrame(context.Background(), fr, env.Sent)
s.txFrame(env.FrameCtx, fr)
case *frames.PerformTransfer:
panic("transfer frames must use txTransfer")
default:
s.txFrame(context.Background(), fr, env.Sent)
s.txFrame(env.FrameCtx, fr)
}
}
}
@ -792,28 +803,18 @@ func (s *Session) muxFrameToLink(l *link, fr frames.FrameBody) {
// transferEnvelope is used by senders to send transfer frames
type transferEnvelope struct {
Ctx context.Context
FrameCtx *frameContext
// the link's remote handle
InputHandle uint32
Frame frames.PerformTransfer
// Sent is *never* nil as we use this for confirmation of sending
// NOTE: use a buffered channel of size 1 when populating
Sent chan error
}
// frameBodyEnvelope is used by senders and receivers to send frames.
type frameBodyEnvelope struct {
Ctx context.Context
FrameCtx *frameContext
FrameBody frames.FrameBody
// Sent *can* be nil depending on what frame is being sent.
// e.g. sending a disposition frame frame a receiver's settlement
// APIs will have a non-nil channel vs sending a flow frame
// NOTE: use a buffered channel of size 1 when populating
Sent chan error
}
// the address of this var is a sentinel value indicating