зеркало из https://github.com/Azure/go-amqp.git
839 строки
27 KiB
Go
839 строки
27 KiB
Go
package amqp
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"sync"
|
|
|
|
"github.com/Azure/go-amqp/internal/bitmap"
|
|
"github.com/Azure/go-amqp/internal/debug"
|
|
"github.com/Azure/go-amqp/internal/encoding"
|
|
"github.com/Azure/go-amqp/internal/frames"
|
|
"github.com/Azure/go-amqp/internal/queue"
|
|
)
|
|
|
|
// Default session options
|
|
const (
|
|
defaultWindow = 5000
|
|
)
|
|
|
|
// SessionOptions contains the optional settings for configuring an AMQP session.
|
|
type SessionOptions struct {
|
|
// MaxLinks sets the maximum number of links (Senders/Receivers)
|
|
// allowed on the session.
|
|
//
|
|
// Minimum: 1.
|
|
// Default: 4294967295.
|
|
MaxLinks uint32
|
|
}
|
|
|
|
// Session is an AMQP session.
|
|
//
|
|
// A session multiplexes Receivers.
|
|
type Session struct {
|
|
channel uint16 // session's local channel
|
|
remoteChannel uint16 // session's remote channel, owned by conn.connReader
|
|
conn *Conn // underlying conn
|
|
tx chan frameBodyEnvelope // non-transfer frames to be sent; session must track disposition
|
|
txTransfer chan transferEnvelope // transfer frames to be sent; session must track disposition
|
|
|
|
// frames destined for this session are added to this queue by conn.connReader
|
|
rxQ *queue.Holder[frames.FrameBody]
|
|
|
|
// properties returned by the peer
|
|
peerProperties map[string]any
|
|
|
|
// flow control
|
|
incomingWindow uint32
|
|
outgoingWindow uint32
|
|
needFlowCount uint32
|
|
|
|
handleMax uint32
|
|
|
|
// link management
|
|
linksMu sync.RWMutex // used to synchronize link handle allocation
|
|
linksByKey map[linkKey]*link // mapping of name+role link
|
|
outputHandles *bitmap.Bitmap // allocated output handles
|
|
|
|
abandonedLinksMu sync.Mutex
|
|
abandonedLinks []*link
|
|
|
|
// used for gracefully closing session
|
|
close chan struct{} // closed by calling Close(). it signals that the end performative should be sent
|
|
closeOnce sync.Once
|
|
|
|
// part of internal public surface area
|
|
done chan struct{} // closed when the session has terminated (mux exited); DO NOT wait on this from within Session.mux() as it will never trigger!
|
|
endSent chan struct{} // closed when the end performative has been sent; once this is closed, links MUST NOT send any frames!
|
|
doneErr error // contains the mux error state; ONLY written to by the mux and MUST only be read from after done is closed!
|
|
closeErr error // contains the error state returned from Close(); ONLY Close() reads/writes this!
|
|
}
|
|
|
|
func newSession(c *Conn, channel uint16, opts *SessionOptions) *Session {
|
|
s := &Session{
|
|
conn: c,
|
|
channel: channel,
|
|
tx: make(chan frameBodyEnvelope),
|
|
txTransfer: make(chan transferEnvelope),
|
|
incomingWindow: defaultWindow,
|
|
outgoingWindow: defaultWindow,
|
|
handleMax: math.MaxUint32 - 1,
|
|
linksMu: sync.RWMutex{},
|
|
linksByKey: make(map[linkKey]*link),
|
|
close: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
endSent: make(chan struct{}),
|
|
}
|
|
|
|
if opts != nil {
|
|
if opts.MaxLinks != 0 {
|
|
// MaxLinks is the number of total links.
|
|
// handleMax is the max handle ID which starts
|
|
// at zero. so we decrement by one
|
|
s.handleMax = opts.MaxLinks - 1
|
|
}
|
|
}
|
|
|
|
// create output handle map after options have been applied
|
|
s.outputHandles = bitmap.New(s.handleMax)
|
|
|
|
s.rxQ = queue.NewHolder(queue.New[frames.FrameBody](int(s.incomingWindow)))
|
|
|
|
return s
|
|
}
|
|
|
|
// waitForFrame waits for an incoming frame to be queued.
|
|
// it returns the next frame from the queue, or an error.
|
|
// the error is either from the context or conn.doneErr.
|
|
// not meant for consumption outside of session.go.
|
|
func (s *Session) waitForFrame(ctx context.Context) (frames.FrameBody, error) {
|
|
var q *queue.Queue[frames.FrameBody]
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-s.conn.done:
|
|
return nil, s.conn.doneErr
|
|
case q = <-s.rxQ.Wait():
|
|
// populated queue
|
|
}
|
|
|
|
fr := q.Dequeue()
|
|
s.rxQ.Release(q)
|
|
|
|
return *fr, nil
|
|
}
|
|
|
|
func (s *Session) begin(ctx context.Context) error {
|
|
// send Begin to server
|
|
begin := &frames.PerformBegin{
|
|
NextOutgoingID: 0,
|
|
IncomingWindow: s.incomingWindow,
|
|
OutgoingWindow: s.outgoingWindow,
|
|
HandleMax: s.handleMax,
|
|
}
|
|
|
|
if err := s.txFrameAndWait(ctx, begin); err != nil {
|
|
return err
|
|
}
|
|
|
|
// wait for response
|
|
fr, err := s.waitForFrame(ctx)
|
|
if err != nil {
|
|
// if we exit before receiving the ack, our caller will clean up the channel.
|
|
// however, it does mean that the peer will now have assigned an outgoing
|
|
// channel ID that's not in use.
|
|
return err
|
|
}
|
|
|
|
begin, ok := fr.(*frames.PerformBegin)
|
|
if !ok {
|
|
// this codepath is hard to hit (impossible?). if the response isn't a PerformBegin and we've not
|
|
// yet seen the remote channel number, the default clause in conn.connReader will protect us from that.
|
|
// if we have seen the remote channel number then it's likely the session.mux for that channel will
|
|
// either swallow the frame or blow up in some other way, both causing this call to hang.
|
|
// deallocate session on error. we can't call
|
|
// s.Close() as the session mux hasn't started yet.
|
|
debug.Log(1, "RX (Session %p): unexpected begin response frame %T", s, fr)
|
|
s.conn.deleteSession(s)
|
|
if err := s.conn.Close(); err != nil {
|
|
return err
|
|
}
|
|
return &ConnError{inner: fmt.Errorf("unexpected begin response: %#v", fr)}
|
|
}
|
|
|
|
if len(begin.Properties) > 0 {
|
|
s.peerProperties = map[string]any{}
|
|
for k, v := range begin.Properties {
|
|
s.peerProperties[string(k)] = v
|
|
}
|
|
}
|
|
|
|
// start Session multiplexor
|
|
go s.mux(begin)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the session.
|
|
// - ctx controls waiting for the peer to acknowledge the session is closed
|
|
//
|
|
// If the context's deadline expires or is cancelled before the operation
|
|
// completes, an error is returned. However, the operation will continue to
|
|
// execute in the background. Subsequent calls will return a *SessionError
|
|
// that contains the context's error message.
|
|
func (s *Session) Close(ctx context.Context) error {
|
|
var ctxErr error
|
|
s.closeOnce.Do(func() {
|
|
close(s.close)
|
|
|
|
// once the mux has received the ack'ing end performative, the mux will
|
|
// exit which deletes the session and closes s.done.
|
|
select {
|
|
case <-s.done:
|
|
s.closeErr = s.doneErr
|
|
|
|
case <-ctx.Done():
|
|
// notify the caller that the close timed out/was cancelled.
|
|
// the mux will remain running and once the ack is received it will terminate.
|
|
ctxErr = ctx.Err()
|
|
|
|
// record that the close timed out/was cancelled.
|
|
// subsequent calls to Close() will return this
|
|
debug.Log(1, "TX (Session %p) channel %d: %v", s, s.channel, ctxErr)
|
|
s.closeErr = &SessionError{inner: ctxErr}
|
|
}
|
|
})
|
|
|
|
if ctxErr != nil {
|
|
return ctxErr
|
|
}
|
|
|
|
var sessionErr *SessionError
|
|
if errors.As(s.closeErr, &sessionErr) && sessionErr.RemoteErr == nil && sessionErr.inner == nil {
|
|
// an empty SessionError means the session was cleanly closed by the caller
|
|
return nil
|
|
}
|
|
return s.closeErr
|
|
}
|
|
|
|
// txFrame sends a frame to the connWriter.
|
|
// - ctx is used to provide the write deadline
|
|
// - fr is the frame to write to net.Conn
|
|
func (s *Session) txFrame(frameCtx *frameContext, fr frames.FrameBody) {
|
|
debug.Log(2, "TX (Session %p) mux frame to Conn (%p): %s", s, s.conn, fr)
|
|
s.conn.sendFrame(frameEnvelope{
|
|
FrameCtx: frameCtx,
|
|
Frame: frames.Frame{
|
|
Type: frames.TypeAMQP,
|
|
Channel: s.channel,
|
|
Body: fr,
|
|
},
|
|
})
|
|
}
|
|
|
|
// txFrameAndWait sends a frame to the connWriter and waits for the write to complete
|
|
// - ctx is used to provide the write deadline
|
|
// - fr is the frame to write to net.Conn
|
|
func (s *Session) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error {
|
|
frameCtx := frameContext{
|
|
Ctx: ctx,
|
|
Done: make(chan struct{}),
|
|
}
|
|
|
|
s.txFrame(&frameCtx, fr)
|
|
|
|
select {
|
|
case <-frameCtx.Done:
|
|
return frameCtx.Err
|
|
case <-s.conn.done:
|
|
return s.conn.doneErr
|
|
case <-s.done:
|
|
return s.doneErr
|
|
}
|
|
}
|
|
|
|
// NewReceiver opens a new receiver link on the session.
|
|
// - ctx controls waiting for the peer to create a sending terminus
|
|
// - source is the name of the peer's sending terminus
|
|
// - opts contains optional values, pass nil to accept the defaults
|
|
//
|
|
// If the context's deadline expires or is cancelled before the operation
|
|
// completes, an error is returned. If the Receiver was successfully
|
|
// created, it will be cleaned up in future calls to NewReceiver.
|
|
func (s *Session) NewReceiver(ctx context.Context, source string, opts *ReceiverOptions) (*Receiver, error) {
|
|
return newReceiverForSession(ctx, s, source, opts, receiverTestHooks{})
|
|
}
|
|
|
|
// split out so tests can add hooks
|
|
func newReceiverForSession(ctx context.Context, s *Session, source string, opts *ReceiverOptions, hooks receiverTestHooks) (*Receiver, error) {
|
|
r, err := newReceiver(source, s, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err = r.attach(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go r.mux(hooks)
|
|
|
|
return r, nil
|
|
}
|
|
|
|
// NewSender opens a new sender link on the session.
|
|
// - ctx controls waiting for the peer to create a receiver terminus
|
|
// - target is the name of the peer's receiver terminus
|
|
// - opts contains optional values, pass nil to accept the defaults
|
|
//
|
|
// If the context's deadline expires or is cancelled before the operation
|
|
// completes, an error is returned. If the Sender was successfully
|
|
// created, it will be cleaned up in future calls to NewSender.
|
|
func (s *Session) NewSender(ctx context.Context, target string, opts *SenderOptions) (*Sender, error) {
|
|
return newSenderForSession(ctx, s, target, opts, senderTestHooks{})
|
|
}
|
|
|
|
// Properties returns the peer's session properties.
|
|
// Returns nil if the peer didn't send any properties.
|
|
func (s *Session) Properties() map[string]any {
|
|
return s.peerProperties
|
|
}
|
|
|
|
// split out so tests can add hooks
|
|
func newSenderForSession(ctx context.Context, s *Session, target string, opts *SenderOptions, hooks senderTestHooks) (*Sender, error) {
|
|
l, err := newSender(target, s, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err = l.attach(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go l.mux(hooks)
|
|
|
|
return l, nil
|
|
}
|
|
|
|
func (s *Session) mux(remoteBegin *frames.PerformBegin) {
|
|
defer func() {
|
|
if s.doneErr == nil {
|
|
s.doneErr = &SessionError{}
|
|
} else if connErr := (&ConnError{}); !errors.As(s.doneErr, &connErr) {
|
|
// only wrap non-ConnError error types
|
|
var amqpErr *Error
|
|
if errors.As(s.doneErr, &amqpErr) {
|
|
s.doneErr = &SessionError{RemoteErr: amqpErr}
|
|
} else {
|
|
s.doneErr = &SessionError{inner: s.doneErr}
|
|
}
|
|
}
|
|
// Signal goroutines waiting on the session.
|
|
close(s.done)
|
|
}()
|
|
|
|
var (
|
|
// maps input (remote) handles to links
|
|
linkFromInputHandle = make(map[uint32]*link)
|
|
|
|
// maps local delivery IDs (sending transfers) to input (remote) handles
|
|
inputHandleFromDeliveryID = make(map[uint32]uint32)
|
|
|
|
// maps remote delivery IDs (receiving transfers) to input (remote) handles
|
|
inputHandleFromRemoteDeliveryID = make(map[uint32]uint32)
|
|
|
|
// maps delivery IDs to output (our) handles. used for multi-frame transfers
|
|
deliveryIDFromOutputHandle = make(map[uint32]uint32)
|
|
|
|
// maps delivery IDs to the settlement state channel
|
|
settlementFromDeliveryID = make(map[uint32]chan encoding.DeliveryState)
|
|
|
|
// tracks the next delivery ID for outgoing transfers
|
|
nextDeliveryID uint32
|
|
|
|
// flow control values
|
|
nextOutgoingID uint32
|
|
nextIncomingID = remoteBegin.NextOutgoingID
|
|
remoteIncomingWindow = remoteBegin.IncomingWindow
|
|
remoteOutgoingWindow = remoteBegin.OutgoingWindow
|
|
|
|
closeInProgress bool // indicates the end performative has been sent
|
|
)
|
|
|
|
closeWithError := func(e1 *Error, e2 error) {
|
|
if closeInProgress {
|
|
debug.Log(3, "TX (Session %p): close already pending, discarding %v", s, e1)
|
|
return
|
|
}
|
|
|
|
closeInProgress = true
|
|
s.doneErr = e2
|
|
s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{Error: e1})
|
|
close(s.endSent)
|
|
}
|
|
|
|
for {
|
|
txTransfer := s.txTransfer
|
|
// disable txTransfer if flow control windows have been exceeded
|
|
if remoteIncomingWindow == 0 || s.outgoingWindow == 0 {
|
|
debug.Log(1, "TX (Session %p): disabling txTransfer - window exceeded. remoteIncomingWindow: %d outgoingWindow: %d",
|
|
s, remoteIncomingWindow, s.outgoingWindow)
|
|
txTransfer = nil
|
|
}
|
|
|
|
tx := s.tx
|
|
closed := s.close
|
|
if closeInProgress {
|
|
// swap out channel so it no longer triggers
|
|
closed = nil
|
|
|
|
// once the end performative is sent, we're not allowed to send any frames
|
|
tx = nil
|
|
txTransfer = nil
|
|
}
|
|
|
|
// notes on client-side closing session
|
|
// when session is closed, we must keep the mux running until the ack'ing end performative
|
|
// has been received. during this window, the session is allowed to receive frames but cannot
|
|
// send them.
|
|
// client-side close happens either by user calling Session.Close() or due to mux initiated
|
|
// close due to a violation of some invariant (see sending &Error{} to s.close). in the case
|
|
// that both code paths have been triggered, we must be careful to preserve the error that
|
|
// triggered the mux initiated close so it can be surfaced to the caller.
|
|
|
|
select {
|
|
// conn has completed, exit
|
|
case <-s.conn.done:
|
|
s.doneErr = s.conn.doneErr
|
|
return
|
|
|
|
case <-closed:
|
|
if closeInProgress {
|
|
// a client-side close due to protocol error is in progress
|
|
continue
|
|
}
|
|
// session is being closed by the client
|
|
closeInProgress = true
|
|
s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{})
|
|
close(s.endSent)
|
|
|
|
// incoming frame
|
|
case q := <-s.rxQ.Wait():
|
|
fr := *q.Dequeue()
|
|
s.rxQ.Release(q)
|
|
debug.Log(2, "RX (Session %p): %s", s, fr)
|
|
|
|
switch body := fr.(type) {
|
|
// Disposition frames can reference transfers from more than one
|
|
// link. Send this frame to all of them.
|
|
case *frames.PerformDisposition:
|
|
start := body.First
|
|
end := start
|
|
if body.Last != nil {
|
|
end = *body.Last
|
|
}
|
|
for deliveryID := start; deliveryID <= end; deliveryID++ {
|
|
// find the input (remote) handle for this delivery ID.
|
|
// default to the map for local delivery IDs.
|
|
handles := inputHandleFromDeliveryID
|
|
if body.Role == encoding.RoleSender {
|
|
// the disposition frame is meant for a receiver
|
|
// so look in the map for remote delivery IDs.
|
|
handles = inputHandleFromRemoteDeliveryID
|
|
}
|
|
|
|
inputHandle, ok := handles[deliveryID]
|
|
if !ok {
|
|
debug.Log(2, "RX (Session %p): role %s: didn't find deliveryID %d in inputHandlesByDeliveryID map", s, body.Role, deliveryID)
|
|
continue
|
|
}
|
|
delete(handles, deliveryID)
|
|
|
|
if body.Settled && body.Role == encoding.RoleReceiver {
|
|
// check if settlement confirmation was requested, if so
|
|
// confirm by closing channel (RSM == ModeFirst)
|
|
if done, ok := settlementFromDeliveryID[deliveryID]; ok {
|
|
delete(settlementFromDeliveryID, deliveryID)
|
|
select {
|
|
case done <- body.State:
|
|
default:
|
|
}
|
|
close(done)
|
|
}
|
|
}
|
|
|
|
// now find the *link for this input (remote) handle
|
|
link, ok := linkFromInputHandle[inputHandle]
|
|
if !ok {
|
|
closeWithError(&Error{
|
|
Condition: ErrCondUnattachedHandle,
|
|
Description: "received disposition frame referencing a handle that's not in use",
|
|
}, fmt.Errorf("received disposition frame with unknown link input handle %d", inputHandle))
|
|
continue
|
|
}
|
|
|
|
s.muxFrameToLink(link, fr)
|
|
}
|
|
continue
|
|
case *frames.PerformFlow:
|
|
if body.NextIncomingID == nil {
|
|
// This is a protocol error:
|
|
// "[...] MUST be set if the peer has received
|
|
// the begin frame for the session"
|
|
closeWithError(&Error{
|
|
Condition: ErrCondNotAllowed,
|
|
Description: "next-incoming-id not set after session established",
|
|
}, errors.New("protocol error: received flow without next-incoming-id after session established"))
|
|
continue
|
|
}
|
|
|
|
// "When the endpoint receives a flow frame from its peer,
|
|
// it MUST update the next-incoming-id directly from the
|
|
// next-outgoing-id of the frame, and it MUST update the
|
|
// remote-outgoing-window directly from the outgoing-window
|
|
// of the frame."
|
|
nextIncomingID = body.NextOutgoingID
|
|
remoteOutgoingWindow = body.OutgoingWindow
|
|
|
|
// "The remote-incoming-window is computed as follows:
|
|
//
|
|
// next-incoming-id(flow) + incoming-window(flow) - next-outgoing-id(endpoint)
|
|
//
|
|
// If the next-incoming-id field of the flow frame is not set, then remote-incoming-window is computed as follows:
|
|
//
|
|
// initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)"
|
|
remoteIncomingWindow = body.IncomingWindow - nextOutgoingID
|
|
remoteIncomingWindow += *body.NextIncomingID
|
|
debug.Log(3, "RX (Session %p): flow - remoteOutgoingWindow: %d remoteIncomingWindow: %d nextOutgoingID: %d", s, remoteOutgoingWindow, remoteIncomingWindow, nextOutgoingID)
|
|
|
|
// Send to link if handle is set
|
|
if body.Handle != nil {
|
|
link, ok := linkFromInputHandle[*body.Handle]
|
|
if !ok {
|
|
closeWithError(&Error{
|
|
Condition: ErrCondUnattachedHandle,
|
|
Description: "received flow frame referencing a handle that's not in use",
|
|
}, fmt.Errorf("received flow frame with unknown link handle %d", body.Handle))
|
|
continue
|
|
}
|
|
|
|
s.muxFrameToLink(link, fr)
|
|
continue
|
|
}
|
|
|
|
if body.Echo && !closeInProgress {
|
|
niID := nextIncomingID
|
|
resp := &frames.PerformFlow{
|
|
NextIncomingID: &niID,
|
|
IncomingWindow: s.incomingWindow,
|
|
NextOutgoingID: nextOutgoingID,
|
|
OutgoingWindow: s.outgoingWindow,
|
|
}
|
|
s.txFrame(&frameContext{Ctx: context.Background()}, resp)
|
|
}
|
|
|
|
case *frames.PerformAttach:
|
|
// On Attach response link should be looked up by name, then added
|
|
// to the links map with the remote's handle contained in this
|
|
// attach frame.
|
|
//
|
|
// Note body.Role is the remote peer's role, we reverse for the local key.
|
|
s.linksMu.RLock()
|
|
link, linkOk := s.linksByKey[linkKey{name: body.Name, role: !body.Role}]
|
|
s.linksMu.RUnlock()
|
|
if !linkOk {
|
|
closeWithError(&Error{
|
|
Condition: ErrCondNotAllowed,
|
|
Description: "received mismatched attach frame",
|
|
}, fmt.Errorf("protocol error: received mismatched attach frame %+v", body))
|
|
continue
|
|
}
|
|
|
|
// track the input (remote) handle number for this link.
|
|
// note that it might be a different value than our output handle.
|
|
link.inputHandle = body.Handle
|
|
linkFromInputHandle[link.inputHandle] = link
|
|
|
|
s.muxFrameToLink(link, fr)
|
|
|
|
debug.Log(1, "RX (Session %p): link %s attached, input handle %d, output handle %d", s, link.key.name, link.inputHandle, link.outputHandle)
|
|
|
|
case *frames.PerformTransfer:
|
|
s.needFlowCount++
|
|
// "Upon receiving a transfer, the receiving endpoint will
|
|
// increment the next-incoming-id to match the implicit
|
|
// transfer-id of the incoming transfer plus one, as well
|
|
// as decrementing the remote-outgoing-window, and MAY
|
|
// (depending on policy) decrement its incoming-window."
|
|
nextIncomingID++
|
|
// don't loop to intmax
|
|
if remoteOutgoingWindow > 0 {
|
|
remoteOutgoingWindow--
|
|
}
|
|
link, ok := linkFromInputHandle[body.Handle]
|
|
if !ok {
|
|
closeWithError(&Error{
|
|
Condition: ErrCondUnattachedHandle,
|
|
Description: "received transfer frame referencing a handle that's not in use",
|
|
}, fmt.Errorf("received transfer frame with unknown link handle %d", body.Handle))
|
|
continue
|
|
}
|
|
|
|
s.muxFrameToLink(link, fr)
|
|
|
|
// if this message is received unsettled and link rcv-settle-mode == second, add to handlesByRemoteDeliveryID
|
|
if !body.Settled && body.DeliveryID != nil && link.receiverSettleMode != nil && *link.receiverSettleMode == ReceiverSettleModeSecond {
|
|
debug.Log(1, "RX (Session %p): adding handle %d to inputHandleFromRemoteDeliveryID. remote delivery ID: %d", s, body.Handle, *body.DeliveryID)
|
|
inputHandleFromRemoteDeliveryID[*body.DeliveryID] = body.Handle
|
|
}
|
|
|
|
// Update peer's outgoing window if half has been consumed.
|
|
if s.needFlowCount >= s.incomingWindow/2 && !closeInProgress {
|
|
debug.Log(3, "RX (Session %p): channel %d: flow - s.needFlowCount(%d) >= s.incomingWindow(%d)/2\n", s, s.channel, s.needFlowCount, s.incomingWindow)
|
|
s.needFlowCount = 0
|
|
nID := nextIncomingID
|
|
flow := &frames.PerformFlow{
|
|
NextIncomingID: &nID,
|
|
IncomingWindow: s.incomingWindow,
|
|
NextOutgoingID: nextOutgoingID,
|
|
OutgoingWindow: s.outgoingWindow,
|
|
}
|
|
s.txFrame(&frameContext{Ctx: context.Background()}, flow)
|
|
}
|
|
|
|
case *frames.PerformDetach:
|
|
link, ok := linkFromInputHandle[body.Handle]
|
|
if !ok {
|
|
closeWithError(&Error{
|
|
Condition: ErrCondUnattachedHandle,
|
|
Description: "received detach frame referencing a handle that's not in use",
|
|
}, fmt.Errorf("received detach frame with unknown link handle %d", body.Handle))
|
|
continue
|
|
}
|
|
s.muxFrameToLink(link, fr)
|
|
|
|
// we received a detach frame and sent it to the link.
|
|
// this was either the response to a client-side initiated
|
|
// detach or our peer detached us. either way, now that
|
|
// the link has processed the frame it's detached so we
|
|
// are safe to clean up its state.
|
|
delete(linkFromInputHandle, link.inputHandle)
|
|
delete(deliveryIDFromOutputHandle, link.outputHandle)
|
|
s.deallocateHandle(link)
|
|
|
|
case *frames.PerformEnd:
|
|
// there are two possibilities:
|
|
// - this is the ack to a client-side Close()
|
|
// - the peer is ending the session so we must ack
|
|
|
|
if closeInProgress {
|
|
return
|
|
}
|
|
|
|
// peer detached us with an error, save it and send the ack
|
|
if body.Error != nil {
|
|
s.doneErr = body.Error
|
|
}
|
|
|
|
fr := frames.PerformEnd{}
|
|
s.txFrame(&frameContext{Ctx: context.Background()}, &fr)
|
|
|
|
// per spec, when end is received, we're no longer allowed to receive frames
|
|
return
|
|
|
|
default:
|
|
debug.Log(1, "RX (Session %p): unexpected frame: %s\n", s, body)
|
|
closeWithError(&Error{
|
|
Condition: ErrCondInternalError,
|
|
Description: "session received unexpected frame",
|
|
}, fmt.Errorf("internal error: unexpected frame %T", body))
|
|
}
|
|
|
|
case env := <-txTransfer:
|
|
fr := &env.Frame
|
|
// record current delivery ID
|
|
var deliveryID uint32
|
|
if fr.DeliveryID == &needsDeliveryID {
|
|
deliveryID = nextDeliveryID
|
|
fr.DeliveryID = &deliveryID
|
|
nextDeliveryID++
|
|
deliveryIDFromOutputHandle[fr.Handle] = deliveryID
|
|
|
|
if !fr.Settled {
|
|
inputHandleFromDeliveryID[deliveryID] = env.InputHandle
|
|
}
|
|
} else {
|
|
// if fr.DeliveryID is nil it must have been added
|
|
// to deliveryIDByHandle already (multi-frame transfer)
|
|
deliveryID = deliveryIDFromOutputHandle[fr.Handle]
|
|
}
|
|
|
|
// log after the delivery ID has been assigned
|
|
debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr)
|
|
|
|
// frame has been sender-settled, remove from map.
|
|
// this should only come into play for multi-frame transfers.
|
|
if fr.Settled {
|
|
delete(inputHandleFromDeliveryID, deliveryID)
|
|
}
|
|
|
|
s.txFrame(env.FrameCtx, fr)
|
|
|
|
select {
|
|
case <-env.FrameCtx.Done:
|
|
if env.FrameCtx.Err != nil {
|
|
// transfer wasn't sent, don't update state
|
|
continue
|
|
}
|
|
// transfer was written to the network
|
|
case <-s.conn.done:
|
|
// the write failed, Conn is going down
|
|
continue
|
|
}
|
|
|
|
// if not settled, add done chan to map
|
|
if !fr.Settled && fr.Done != nil {
|
|
settlementFromDeliveryID[deliveryID] = fr.Done
|
|
} else if fr.Done != nil {
|
|
// sender-settled, close done now that the transfer has been sent
|
|
close(fr.Done)
|
|
}
|
|
|
|
// "Upon sending a transfer, the sending endpoint will increment
|
|
// its next-outgoing-id, decrement its remote-incoming-window,
|
|
// and MAY (depending on policy) decrement its outgoing-window."
|
|
nextOutgoingID++
|
|
// don't decrement if we're at 0 or we could loop to int max
|
|
if remoteIncomingWindow != 0 {
|
|
remoteIncomingWindow--
|
|
}
|
|
|
|
case env := <-tx:
|
|
fr := env.FrameBody
|
|
debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr)
|
|
switch fr := env.FrameBody.(type) {
|
|
case *frames.PerformDisposition:
|
|
if fr.Settled && fr.Role == encoding.RoleSender {
|
|
// sender with a peer that's in mode second; sending confirmation of disposition.
|
|
// disposition frames can reference a range of delivery IDs, although it's highly
|
|
// likely in this case there will only be one.
|
|
start := fr.First
|
|
end := start
|
|
if fr.Last != nil {
|
|
end = *fr.Last
|
|
}
|
|
for deliveryID := start; deliveryID <= end; deliveryID++ {
|
|
// send delivery state to the channel and close it to signal
|
|
// that the delivery has completed (RSM == ModeSecond)
|
|
if done, ok := settlementFromDeliveryID[deliveryID]; ok {
|
|
delete(settlementFromDeliveryID, deliveryID)
|
|
select {
|
|
case done <- fr.State:
|
|
default:
|
|
}
|
|
close(done)
|
|
}
|
|
}
|
|
}
|
|
s.txFrame(env.FrameCtx, fr)
|
|
case *frames.PerformFlow:
|
|
niID := nextIncomingID
|
|
fr.NextIncomingID = &niID
|
|
fr.IncomingWindow = s.incomingWindow
|
|
fr.NextOutgoingID = nextOutgoingID
|
|
fr.OutgoingWindow = s.outgoingWindow
|
|
s.txFrame(env.FrameCtx, fr)
|
|
case *frames.PerformTransfer:
|
|
panic("transfer frames must use txTransfer")
|
|
default:
|
|
s.txFrame(env.FrameCtx, fr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Session) allocateHandle(ctx context.Context, l *link) error {
|
|
s.linksMu.Lock()
|
|
defer s.linksMu.Unlock()
|
|
|
|
// Check if link name already exists, if so then an error should be returned
|
|
existing := s.linksByKey[l.key]
|
|
if existing != nil {
|
|
return fmt.Errorf("link with name '%v' already exists", l.key.name)
|
|
}
|
|
|
|
next, ok := s.outputHandles.Next()
|
|
if !ok {
|
|
if err := s.Close(ctx); err != nil {
|
|
return err
|
|
}
|
|
// handle numbers are zero-based, report the actual count
|
|
return &SessionError{inner: fmt.Errorf("reached session handle max (%d)", s.handleMax+1)}
|
|
}
|
|
|
|
l.outputHandle = next // allocate handle to the link
|
|
s.linksByKey[l.key] = l // add to mapping
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) deallocateHandle(l *link) {
|
|
s.linksMu.Lock()
|
|
defer s.linksMu.Unlock()
|
|
|
|
delete(s.linksByKey, l.key)
|
|
s.outputHandles.Remove(l.outputHandle)
|
|
}
|
|
|
|
func (s *Session) abandonLink(l *link) {
|
|
s.abandonedLinksMu.Lock()
|
|
defer s.abandonedLinksMu.Unlock()
|
|
s.abandonedLinks = append(s.abandonedLinks, l)
|
|
}
|
|
|
|
func (s *Session) freeAbandonedLinks(ctx context.Context) error {
|
|
s.abandonedLinksMu.Lock()
|
|
defer s.abandonedLinksMu.Unlock()
|
|
|
|
debug.Log(3, "TX (Session %p): cleaning up %d abandoned links", s, len(s.abandonedLinks))
|
|
|
|
for _, l := range s.abandonedLinks {
|
|
dr := &frames.PerformDetach{
|
|
Handle: l.outputHandle,
|
|
Closed: true,
|
|
}
|
|
if err := s.txFrameAndWait(ctx, dr); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
s.abandonedLinks = nil
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) muxFrameToLink(l *link, fr frames.FrameBody) {
|
|
q := l.rxQ.Acquire()
|
|
q.Enqueue(fr)
|
|
l.rxQ.Release(q)
|
|
debug.Log(2, "RX (Session %p): mux frame to link (%p): %s, %s", s, l, l.key.name, fr)
|
|
}
|
|
|
|
// transferEnvelope is used by senders to send transfer frames
|
|
type transferEnvelope struct {
|
|
FrameCtx *frameContext
|
|
|
|
// the link's remote handle
|
|
InputHandle uint32
|
|
|
|
Frame frames.PerformTransfer
|
|
}
|
|
|
|
// frameBodyEnvelope is used by senders and receivers to send frames.
|
|
type frameBodyEnvelope struct {
|
|
FrameCtx *frameContext
|
|
FrameBody frames.FrameBody
|
|
}
|
|
|
|
// the address of this var is a sentinel value indicating
|
|
// that a transfer frame is in need of a delivery ID
|
|
var needsDeliveryID uint32
|