зеркало из https://github.com/Azure/go-amqp.git
717 строки
23 KiB
Go
717 строки
23 KiB
Go
package amqp
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/Azure/go-amqp/internal/encoding"
|
|
"github.com/Azure/go-amqp/internal/fake"
|
|
"github.com/Azure/go-amqp/internal/frames"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestSessionClose(t *testing.T) {
|
|
channelNum := uint16(0)
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformBegin:
|
|
b, err := fake.PerformBegin(uint16(channelNum), remoteChannel)
|
|
if err != nil {
|
|
return fake.Response{}, err
|
|
}
|
|
channelNum++
|
|
return fake.Response{Payload: b}, nil
|
|
case *frames.PerformEnd:
|
|
// channelNum was incremented
|
|
b, err := fake.PerformEnd(channelNum-1, nil)
|
|
if err != nil {
|
|
return fake.Response{}, err
|
|
}
|
|
channelNum--
|
|
return fake.Response{Payload: b}, nil
|
|
case *frames.PerformClose:
|
|
return newResponse(fake.PerformClose(nil))
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
for i := 0; i < 4; i++ {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoErrorf(t, err, "iteration %d", i)
|
|
require.Equalf(t, uint16(0), session.channel, "iteration %d", i)
|
|
require.Equalf(t, channelNum-1, session.remoteChannel, "iteration %d", i)
|
|
require.Nil(t, session.Properties())
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.NoErrorf(t, err, "iteration %d", i)
|
|
}
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionServerClose(t *testing.T) {
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformBegin:
|
|
return newResponse(fake.PerformBegin(0, remoteChannel))
|
|
case *frames.PerformEnd:
|
|
return fake.Response{}, nil // swallow
|
|
case *frames.PerformClose:
|
|
return fake.Response{}, nil // swallow
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
// initiate server-side closing of session
|
|
fr, err := fake.PerformEnd(0, &encoding.Error{Condition: "closing", Description: "server side close"})
|
|
require.NoError(t, err)
|
|
netConn.SendFrame(fr)
|
|
// wait a bit for connReader to read from the mock
|
|
time.Sleep(100 * time.Millisecond)
|
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.Error(t, err)
|
|
var sessionErr *SessionError
|
|
require.ErrorAs(t, err, &sessionErr)
|
|
require.NotNil(t, sessionErr.RemoteErr)
|
|
require.Equal(t, ErrCond("closing"), sessionErr.RemoteErr.Condition)
|
|
require.Equal(t, "server side close", sessionErr.RemoteErr.Description)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionCloseTimeout(t *testing.T) {
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformBegin:
|
|
return newResponse(fake.PerformBegin(0, remoteChannel))
|
|
case *frames.PerformEnd:
|
|
b, err := fake.PerformEnd(0, nil)
|
|
if err != nil {
|
|
return fake.Response{}, err
|
|
}
|
|
// introduce a delay to trigger session close timeout
|
|
return fake.Response{Payload: b, WriteDelay: 1 * time.Second}, nil
|
|
case *frames.PerformClose:
|
|
return newResponse(fake.PerformClose(nil))
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.ErrorIs(t, err, context.DeadlineExceeded)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
var sessionErr *SessionError
|
|
require.ErrorAs(t, err, &sessionErr)
|
|
require.Contains(t, sessionErr.Error(), context.DeadlineExceeded.Error())
|
|
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestConnCloseSessionClose(t *testing.T) {
|
|
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{
|
|
ChunkSize: 8,
|
|
})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, client.Close())
|
|
// closing the connection should close all sessions
|
|
select {
|
|
case <-session.done:
|
|
// session was closed
|
|
case <-time.After(500 * time.Millisecond):
|
|
t.Fatal("session wasn't closed")
|
|
}
|
|
|
|
rcv, err := session.NewReceiver(context.Background(), "blah", nil)
|
|
require.Nil(t, rcv)
|
|
var connErr *ConnError
|
|
require.ErrorAs(t, err, &connErr)
|
|
}
|
|
|
|
func TestSessionNewReceiverBadOptionFails(t *testing.T) {
|
|
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
recv, err := session.NewReceiver(ctx, "source", &ReceiverOptions{
|
|
Properties: map[string]any{
|
|
"": "bad_key",
|
|
},
|
|
})
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.Nil(t, recv)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionNewReceiverMismatchedLinkName(t *testing.T) {
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformBegin:
|
|
return newResponse(fake.PerformBegin(0, remoteChannel))
|
|
case *frames.PerformEnd:
|
|
return newResponse(fake.PerformEnd(0, nil))
|
|
case *frames.PerformAttach:
|
|
return newResponse(fake.ReceiverAttach(0, "wrong_name", 0, ReceiverSettleModeFirst, nil))
|
|
case *frames.PerformClose:
|
|
return newResponse(fake.PerformClose(nil))
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
recv, err := session.NewReceiver(ctx, "source", &ReceiverOptions{
|
|
Credit: 10,
|
|
})
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.Nil(t, recv)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionNewSenderBadOptionFails(t *testing.T) {
|
|
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
snd, err := session.NewSender(ctx, "target", &SenderOptions{
|
|
Properties: map[string]any{
|
|
"": "bad_key",
|
|
},
|
|
})
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.Nil(t, snd)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionNewSenderMismatchedLinkName(t *testing.T) {
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformBegin:
|
|
return newResponse(fake.PerformBegin(0, remoteChannel))
|
|
case *frames.PerformEnd:
|
|
return newResponse(fake.PerformEnd(0, nil))
|
|
case *frames.PerformAttach:
|
|
return newResponse(fake.SenderAttach(0, "wrong_name", 0, SenderSettleModeUnsettled))
|
|
case *frames.PerformClose:
|
|
return newResponse(fake.PerformClose(nil))
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
snd, err := session.NewSender(ctx, "target", nil)
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.Nil(t, snd)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionNewSenderDuplicateLinks(t *testing.T) {
|
|
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
snd, err := session.NewSender(ctx, "target", &SenderOptions{
|
|
Name: "test",
|
|
})
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.NotNil(t, snd)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
snd, err = session.NewSender(ctx, "target", &SenderOptions{
|
|
Name: "test",
|
|
})
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.Nil(t, snd)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionNewSenderMaxHandles(t *testing.T) {
|
|
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, &SessionOptions{MaxLinks: 1})
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
snd, err := session.NewSender(ctx, "target", &SenderOptions{
|
|
Name: "test1",
|
|
})
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.NotNil(t, snd)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
snd, err = session.NewSender(ctx, "target", &SenderOptions{
|
|
Name: "test2",
|
|
})
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.Nil(t, snd)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionUnexpectedFrame(t *testing.T) {
|
|
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
// this frame causes the session to terminate
|
|
b, err := fake.EncodeFrame(frames.TypeSASL, 0, &frames.SASLMechanisms{})
|
|
require.NoError(t, err)
|
|
netConn.SendFrame(b)
|
|
|
|
// sleep for a bit so that the session mux has time to process the invalid frame before we close
|
|
time.Sleep(50 * time.Millisecond)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.Error(t, err)
|
|
var sessionErr *SessionError
|
|
require.ErrorAs(t, err, &sessionErr)
|
|
require.NotNil(t, sessionErr.inner)
|
|
require.ErrorContains(t, err, "unexpected frame *frames.SASLMechanisms")
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionInvalidFlowFrame(t *testing.T) {
|
|
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
// NextIncomingID cannot be nil once the session has been established
|
|
b, err := fake.EncodeFrame(frames.TypeAMQP, 0, &frames.PerformFlow{})
|
|
require.NoError(t, err)
|
|
netConn.SendFrame(b)
|
|
|
|
// wait a bit for connReader to read from the mock
|
|
time.Sleep(100 * time.Millisecond)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionFlowFrameWithEcho(t *testing.T) {
|
|
nextIncomingID := uint32(1)
|
|
const nextOutgoingID = 2
|
|
echo := make(chan struct{})
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch tt := req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformBegin:
|
|
return newResponse(fake.PerformBegin(0, remoteChannel))
|
|
case *frames.PerformFlow:
|
|
defer func() { close(echo) }()
|
|
// here we receive the echo. verify state
|
|
if id := *tt.NextIncomingID; id != nextOutgoingID {
|
|
return fake.Response{}, fmt.Errorf("unexpected NextIncomingID %d", id)
|
|
}
|
|
if id := tt.NextOutgoingID; id != 0 {
|
|
return fake.Response{}, fmt.Errorf("unexpected NextOutgoingID %d", id)
|
|
}
|
|
if w := tt.IncomingWindow; w != defaultWindow {
|
|
return fake.Response{}, fmt.Errorf("unexpected IncomingWindow %d", w)
|
|
}
|
|
if w := tt.OutgoingWindow; w != defaultWindow {
|
|
return fake.Response{}, fmt.Errorf("unexpected OutgoingWindow %d", w)
|
|
}
|
|
return fake.Response{}, nil
|
|
case *frames.PerformEnd:
|
|
return newResponse(fake.PerformEnd(0, nil))
|
|
case *frames.PerformClose:
|
|
return newResponse(fake.PerformClose(nil))
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{
|
|
ChunkSize: 10,
|
|
})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
b, err := fake.EncodeFrame(frames.TypeAMQP, 0, &frames.PerformFlow{
|
|
NextIncomingID: &nextIncomingID,
|
|
IncomingWindow: 100,
|
|
OutgoingWindow: 100,
|
|
NextOutgoingID: nextOutgoingID,
|
|
Echo: true,
|
|
})
|
|
require.NoError(t, err)
|
|
netConn.SendFrame(b)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestSessionInvalidAttachDeadlock(t *testing.T) {
|
|
var enqueueFrames func()
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformBegin:
|
|
return newResponse(fake.PerformBegin(0, remoteChannel))
|
|
case *frames.PerformEnd:
|
|
return newResponse(fake.PerformEnd(0, nil))
|
|
case *frames.PerformAttach:
|
|
enqueueFrames()
|
|
return fake.Response{}, nil
|
|
case *frames.PerformClose:
|
|
return newResponse(fake.PerformClose(nil))
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
enqueueFrames = func() {
|
|
// send an invalid attach response
|
|
b, err := fake.EncodeFrame(frames.TypeAMQP, 0, &frames.PerformAttach{
|
|
Name: "mismatched",
|
|
Role: encoding.RoleReceiver,
|
|
})
|
|
require.NoError(t, err)
|
|
netConn.SendFrame(b)
|
|
// now follow up with a detach frame
|
|
b, err = fake.EncodeFrame(frames.TypeAMQP, 0, &frames.PerformDetach{
|
|
Error: &encoding.Error{
|
|
Condition: "boom",
|
|
Description: "failed",
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
netConn.SendFrame(b)
|
|
}
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
snd, err := session.NewSender(ctx, "target", nil)
|
|
cancel()
|
|
require.Error(t, err)
|
|
require.Nil(t, snd)
|
|
require.NoError(t, client.Close())
|
|
}
|
|
|
|
func TestNewSessionContextCancelled(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformClose:
|
|
return newResponse(fake.PerformClose(nil))
|
|
case *frames.PerformBegin:
|
|
cancel()
|
|
// swallow frame to prevent non-determinism of cancellation
|
|
return fake.Response{}, nil
|
|
case *fake.KeepAlive:
|
|
return fake.Response{}, nil
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
|
|
|
|
newCtx, newCancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(newCtx, netConn, nil)
|
|
newCancel()
|
|
require.NoError(t, err)
|
|
|
|
session, err := client.NewSession(ctx, nil)
|
|
|
|
require.ErrorIs(t, err, context.Canceled)
|
|
require.Nil(t, session)
|
|
}
|
|
|
|
func TestSessionReceiveTransferNoHandle(t *testing.T) {
|
|
conn := fake.NewNetConn(receiverFrameHandlerNoUnhandled(0, ReceiverSettleModeFirst), fake.NetConnOptions{})
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, conn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
// send transfer when there's no link
|
|
b, err := fake.PerformTransfer(0, 0, 1, []byte("message 1"))
|
|
require.NoError(t, err)
|
|
conn.SendFrame(b)
|
|
|
|
// wait for the messages to "arrive"
|
|
time.Sleep(time.Second)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
var sessionErr *SessionError
|
|
require.ErrorAs(t, session.Close(ctx), &sessionErr)
|
|
require.Contains(t, sessionErr.Error(), "transfer frame with unknown link handle")
|
|
cancel()
|
|
}
|
|
|
|
func TestSessionReceiveDetachrNoHandle(t *testing.T) {
|
|
conn := fake.NewNetConn(receiverFrameHandlerNoUnhandled(0, ReceiverSettleModeFirst), fake.NetConnOptions{})
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, conn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
// send transfer when there's no link
|
|
b, err := fake.PerformDetach(0, 0, nil)
|
|
require.NoError(t, err)
|
|
conn.SendFrame(b)
|
|
|
|
// wait for the messages to "arrive"
|
|
time.Sleep(time.Second)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
var sessionErr *SessionError
|
|
require.ErrorAs(t, session.Close(ctx), &sessionErr)
|
|
require.Contains(t, sessionErr.Error(), "detach frame with unknown link handle")
|
|
cancel()
|
|
}
|
|
|
|
func TestSessionProperties(t *testing.T) {
|
|
responder := func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
|
|
switch req.(type) {
|
|
case *fake.AMQPProto:
|
|
return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
|
|
case *frames.PerformOpen:
|
|
return newResponse(fake.PerformOpen("container"))
|
|
case *frames.PerformBegin:
|
|
b, err := fake.EncodeFrame(frames.TypeAMQP, 0, &frames.PerformBegin{
|
|
RemoteChannel: &remoteChannel,
|
|
NextOutgoingID: 1,
|
|
IncomingWindow: 5000,
|
|
OutgoingWindow: 1000,
|
|
HandleMax: math.MaxInt16,
|
|
Properties: map[encoding.Symbol]any{
|
|
"SessionProperty1": 3.14159,
|
|
"SessionProperty2": 998877,
|
|
},
|
|
})
|
|
return newResponse(b, err)
|
|
case *frames.PerformEnd:
|
|
return newResponse(fake.PerformEnd(0, nil))
|
|
case *frames.PerformClose:
|
|
return newResponse(fake.PerformClose(nil))
|
|
default:
|
|
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
|
|
}
|
|
}
|
|
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
client, err := NewConn(ctx, netConn, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
session, err := client.NewSession(ctx, nil)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.Equal(t, map[string]any{
|
|
"SessionProperty1": 3.14159,
|
|
"SessionProperty2": int64(998877),
|
|
}, session.Properties())
|
|
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
|
err = session.Close(ctx)
|
|
cancel()
|
|
require.NoError(t, err)
|
|
require.NoError(t, client.Close())
|
|
}
|