* Add unit tests for client

Return nil *Client if Dial() fails.
Moved tests in client_test.go to link_test.go as they are link tests.
Moved DetachError from client.go to errors.go.
Cleaned up some ambiguous error strings.
Set default values for PerformOpen in mock connection.
Changed some Printf to debug.
Fixed a hang when deallocating a session whose mux isn't running.
Added various comments.

* fix missing MaxInt on Go 1.16

* fix handling of nil RemoteChannel

* switch to testify for condition checks

* add remote channel parameter when encoding mock frames

* fix type-o
This commit is contained in:
Joel Hendrix 2021-10-07 16:25:30 -07:00 коммит произвёл GitHub
Родитель 51a92ef5d3
Коммит 1c4a2b0576
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 566 добавлений и 128 удалений

Просмотреть файл

@ -1,7 +1,6 @@
package amqp
import (
"context"
"encoding/base64"
"errors"
"fmt"
@ -46,7 +45,10 @@ func Dial(addr string, opts ...ConnOption) (*Client, error) {
return nil, err
}
err = c.Start()
return &Client{conn: c}, err
if err != nil {
return nil, err
}
return &Client{conn: c}, nil
}
// New establishes an AMQP client connection over conn.
@ -65,6 +67,7 @@ func (c *Client) Close() error {
}
// NewSession opens a new AMQP session to the server.
// Returns ErrConnClosed if the underlying connection has been closed.
func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
// get a session allocated by Client.mux
var sResp newSessionResp
@ -82,7 +85,9 @@ func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
for _, opt := range opts {
err := opt(s)
if err != nil {
_ = s.Close(context.Background()) // deallocate session on error
// deallocate session on error. we can't call
// s.Close() as the session mux hasn't started yet.
c.conn.DelSession <- s
return nil, err
}
}
@ -108,7 +113,13 @@ func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
begin, ok := fr.Body.(*frames.PerformBegin)
if !ok {
_ = s.Close(context.Background()) // deallocate session on error
// this codepath is hard to hit (impossible?). if the response isn't a PerformBegin and we've not
// yet seen the remote channel number, the default clause in conn.mux will protect us from that.
// if we have seen the remote channel number then it's likely the session.mux for that channel will
// either swallow the frame or blow up in some other way, both causing this call to hang.
// deallocate session on error. we can't call
// s.Close() as the session mux hasn't started yet.
c.conn.DelSession <- s
return nil, fmt.Errorf("unexpected begin response: %+v", fr.Body)
}
@ -190,24 +201,6 @@ func randString(n int) string {
return base64.RawURLEncoding.EncodeToString(b)
}
// DetachError is returned by a link (Receiver/Sender) when a detach frame is received.
//
// RemoteError will be nil if the link was detached gracefully.
type DetachError struct {
RemoteError *Error
}
func (e *DetachError) Error() string {
return fmt.Sprintf("link detached, reason: %+v", e.RemoteError)
}
// Default link options
const (
DefaultLinkCredit = 1
DefaultLinkBatching = false
DefaultLinkBatchMaxAge = 5 * time.Second
)
// linkKey uniquely identifies a link on a connection by name and direction.
//
// A link can be identified uniquely by the ordered tuple

Просмотреть файл

@ -1,111 +1,409 @@
package amqp
import (
"encoding/binary"
"errors"
"fmt"
"math"
"testing"
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/mocks"
"github.com/stretchr/testify/require"
)
func TestLinkOptions(t *testing.T) {
tests := []struct {
label string
opts []LinkOption
type mockDialer struct {
resp func(frames.FrameBody) ([]byte, error)
}
wantSource *frames.Source
wantProperties map[encoding.Symbol]interface{}
func (m mockDialer) NetDialerDial(c *conn, host, port string) error {
c.net = mocks.NewNetConn(m.resp)
return nil
}
func (mockDialer) TLSDialWithDialer(c *conn, host, port string) error {
panic("nyi")
}
func TestClientDial(t *testing.T) {
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
client, err := Dial("amqp://localhost", connDialer(mockDialer{resp: responder}))
require.NoError(t, err)
require.NotNil(t, client)
// error case
responder = func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return nil, errors.New("mock read failed")
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
client, err = Dial("amqp://localhost", connDialer(mockDialer{resp: responder}))
require.Error(t, err)
require.Nil(t, client)
}
func TestClientClose(t *testing.T) {
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
client, err := Dial("amqp://localhost", connDialer(mockDialer{resp: responder}))
require.NoError(t, err)
require.NotNil(t, client)
time.Sleep(100 * time.Millisecond)
require.NoError(t, client.Close())
time.Sleep(100 * time.Millisecond)
require.NoError(t, client.Close())
}
func TestSessionOptions(t *testing.T) {
const (
// MaxInt added in Go 1.17, this is copied from there
intSize = 32 << (^uint(0) >> 63) // 32 or 64
MaxInt = 1<<(intSize-1) - 1
)
tests := []struct {
label string
opt SessionOption
verify func(t *testing.T, s *Session)
fails bool
}{
{
label: "no options",
},
{
label: "link-filters",
opts: []LinkOption{
LinkSelectorFilter("amqp.annotation.x-opt-offset > '100'"),
LinkProperty("x-opt-test1", "test1"),
LinkProperty("x-opt-test2", "test2"),
LinkProperty("x-opt-test1", "test3"),
LinkPropertyInt64("x-opt-test4", 1),
LinkPropertyInt32("x-opt-test5", 2),
LinkSourceFilter("com.microsoft:session-filter", 0x00000137000000C, "123"),
},
wantSource: &frames.Source{
Filter: map[encoding.Symbol]*encoding.DescribedType{
"apache.org:selector-filter:string": {
Descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04}),
Value: "amqp.annotation.x-opt-offset > '100'",
},
"com.microsoft:session-filter": {
Descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x00, 0x13, 0x70, 0x00, 0x00, 0x0C}),
Value: "123",
},
},
},
wantProperties: map[encoding.Symbol]interface{}{
"x-opt-test1": "test3",
"x-opt-test2": "test2",
"x-opt-test4": int64(1),
"x-opt-test5": int32(2),
label: "SessionIncomingWindow",
opt: SessionIncomingWindow(5000),
verify: func(t *testing.T, s *Session) {
if s.incomingWindow != 5000 {
t.Errorf("unexpected incoming window %d", s.incomingWindow)
}
},
},
{
label: "more-link-filters",
opts: []LinkOption{
LinkSourceFilter("com.microsoft:session-filter", 0x00000137000000C, nil),
},
wantSource: &frames.Source{
Filter: map[encoding.Symbol]*encoding.DescribedType{
"com.microsoft:session-filter": {
Descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x00, 0x13, 0x70, 0x00, 0x00, 0x0C}),
Value: nil,
},
},
label: "SessionOutgoingWindow",
opt: SessionOutgoingWindow(6000),
verify: func(t *testing.T, s *Session) {
if s.outgoingWindow != 6000 {
t.Errorf("unexpected outgoing window %d", s.outgoingWindow)
}
},
},
{
label: "link-source-capabilities",
opts: []LinkOption{
LinkSourceCapabilities("cap1", "cap2", "cap3"),
},
wantSource: &frames.Source{
Capabilities: []encoding.Symbol{"cap1", "cap2", "cap3"},
label: "SessionMaxLinksTooSmall",
opt: SessionMaxLinks(0),
fails: true,
},
{
label: "SessionMaxLinksTooLarge",
opt: SessionMaxLinks(MaxInt),
fails: true,
},
{
label: "SessionMaxLinks",
opt: SessionMaxLinks(4096),
verify: func(t *testing.T, s *Session) {
if s.handleMax != 4096-1 {
t.Errorf("unexpected max links %d", s.handleMax)
}
},
},
}
for _, tt := range tests {
t.Run(tt.label, func(t *testing.T) {
got, err := newLink(nil, nil, tt.opts)
if err != nil {
t.Fatal(err)
session := newSession(nil, 0)
err := tt.opt(session)
if err != nil && !tt.fails {
t.Error(err)
}
if !testEqual(got.Source, tt.wantSource) {
t.Errorf("Source properties don't match expected:\n %s", testDiff(got.Source, tt.wantSource))
}
if !testEqual(got.properties, tt.wantProperties) {
t.Errorf("Link properties don't match expected:\n %s", testDiff(got.properties, tt.wantProperties))
if !tt.fails {
tt.verify(t, session)
}
})
}
}
func TestSourceName(t *testing.T) {
expectedSourceName := "source-name"
opts := []LinkOption{
LinkName(expectedSourceName),
func TestClientNewSession(t *testing.T) {
const channelNum = 0
const incomingWindow = 5000
const outgoingWindow = 6000
responder := func(req frames.FrameBody) ([]byte, error) {
switch tt := req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
case *frames.PerformBegin:
if tt.RemoteChannel != nil {
return nil, errors.New("expected nil remote channel")
}
if tt.IncomingWindow != incomingWindow {
return nil, fmt.Errorf("unexpected incoming window %d", tt.IncomingWindow)
}
if tt.OutgoingWindow != outgoingWindow {
return nil, fmt.Errorf("unexpected incoming window %d", tt.OutgoingWindow)
}
return mocks.PerformBegin(channelNum)
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := mocks.NewNetConn(responder)
got, err := newLink(nil, nil, opts)
if err != nil {
t.Fatal(err)
}
if got.Key.name != expectedSourceName {
t.Errorf("Link Source Name does not match expected: %v got: %v", expectedSourceName, got.Key.name)
}
client, err := New(netConn)
require.NoError(t, err)
session, err := client.NewSession(SessionIncomingWindow(incomingWindow), SessionOutgoingWindow(outgoingWindow))
require.NoError(t, err)
require.NotNil(t, session)
require.Equal(t, uint16(channelNum), session.channel)
time.Sleep(100 * time.Millisecond)
require.NoError(t, client.Close())
// creating a session after the connection has been closed returns nothing
session, err = client.NewSession()
require.Equal(t, ErrConnClosed, err)
require.Nil(t, session)
}
func TestClientMultipleSessions(t *testing.T) {
channelNum := uint16(0)
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
case *frames.PerformBegin:
b, err := mocks.PerformBegin(channelNum)
channelNum++
return b, err
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
require.NoError(t, err)
// first session
session1, err := client.NewSession()
require.NoError(t, err)
require.NotNil(t, session1)
require.Equal(t, channelNum-1, session1.channel)
// second session
time.Sleep(100 * time.Millisecond)
session2, err := client.NewSession()
require.NoError(t, err)
require.NotNil(t, session2)
require.Equal(t, channelNum-1, session2.channel)
time.Sleep(100 * time.Millisecond)
require.NoError(t, client.Close())
}
func TestClientTooManySessions(t *testing.T) {
channelNum := uint16(0)
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
// return small number of max channels
return mocks.EncodeFrame(mocks.FrameAMQP, 0, &frames.PerformOpen{
ChannelMax: 1,
ContainerID: "test",
IdleTimeout: time.Minute,
MaxFrameSize: 4294967295,
})
case *frames.PerformBegin:
b, err := mocks.PerformBegin(channelNum)
channelNum++
return b, err
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
require.NoError(t, err)
for i := uint16(0); i < 3; i++ {
session, err := client.NewSession()
if i < 2 {
require.NoError(t, err)
require.NotNil(t, session)
} else {
// third channel should fail
require.Error(t, err)
require.Nil(t, session)
}
}
time.Sleep(100 * time.Millisecond)
require.NoError(t, client.Close())
}
func TestClientNewSessionInvalidOption(t *testing.T) {
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
require.NoError(t, err)
session, err := client.NewSession(SessionMaxLinks(0))
require.Error(t, err)
require.Nil(t, session)
time.Sleep(100 * time.Millisecond)
require.NoError(t, client.Close())
}
func TestClientNewSessionMissingRemoteChannel(t *testing.T) {
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
case *frames.PerformBegin:
// return begin with nil RemoteChannel
return mocks.EncodeFrame(mocks.FrameAMQP, 0, &frames.PerformBegin{
NextOutgoingID: 1,
IncomingWindow: 5000,
OutgoingWindow: 1000,
HandleMax: math.MaxInt16,
})
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
require.NoError(t, err)
session, err := client.NewSession(SessionMaxLinks(1))
require.Error(t, err)
require.Nil(t, session)
time.Sleep(100 * time.Millisecond)
require.Error(t, client.Close())
}
func TestClientNewSessionInvalidInitialResponse(t *testing.T) {
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
case *frames.PerformBegin:
// respond with the wrong frame type
return mocks.PerformOpen("bad")
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
require.NoError(t, err)
session, err := client.NewSession()
require.Error(t, err)
require.Nil(t, session)
}
func TestClientNewSessionInvalidSecondResponseSameChannel(t *testing.T) {
t.Skip("test hangs due to session mux eating unexpected frames")
firstChan := true
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
case *frames.PerformBegin:
if firstChan {
firstChan = false
return mocks.PerformBegin(0)
}
// respond with the wrong frame type
return mocks.PerformOpen("bad")
case *frames.PerformEnd:
return mocks.PerformEnd(0, nil)
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
require.NoError(t, err)
// fisrt session succeeds
session, err := client.NewSession()
require.NoError(t, err)
require.NotNil(t, session)
// second session fails
session, err = client.NewSession()
require.Error(t, err)
require.Nil(t, session)
time.Sleep(100 * time.Millisecond)
require.NoError(t, client.Close())
}
func TestClientNewSessionInvalidSecondResponseDifferentChannel(t *testing.T) {
firstChan := true
responder := func(req frames.FrameBody) ([]byte, error) {
switch req.(type) {
case *mocks.AMQPProto:
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
return mocks.PerformOpen("container")
case *frames.PerformBegin:
if firstChan {
firstChan = false
return mocks.PerformBegin(0)
}
// respond with the wrong frame type
// note that it has to be for the next channel
return mocks.PerformDisposition(1, 0, nil)
case *frames.PerformEnd:
return mocks.PerformEnd(0, nil)
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
require.NoError(t, err)
// fisrt session succeeds
session, err := client.NewSession()
require.NoError(t, err)
require.NotNil(t, session)
// second session fails
session, err = client.NewSession()
require.Error(t, err)
require.Nil(t, session)
time.Sleep(100 * time.Millisecond)
require.Error(t, client.Close())
}

17
conn.go
Просмотреть файл

@ -418,6 +418,7 @@ func (c *conn) mux() {
channels = bitmap.New(uint32(c.channelMax))
// create the next session to allocate
// note that channel always start at 0, and 0 is special and can't be deleted
nextChannel, _ = channels.Next()
nextSession = newSessionResp{session: newSession(c, uint16(nextChannel))}
@ -461,10 +462,14 @@ func (c *conn) mux() {
// RemoteChannel should be used when frame is Begin
case *frames.PerformBegin:
if body.RemoteChannel == nil {
// since we only support remotely-initiated sessions, this is an error
// TODO: it would be ideal to not have this kill the connection
c.err = fmt.Errorf("%T: nil RemoteChannel", fr.Body)
break
}
session, ok = sessionsByChannel[*body.RemoteChannel]
if !ok {
c.err = fmt.Errorf("unexpected remote channel number %d, expected %d", *body.RemoteChannel, nextChannel)
break
}
@ -473,10 +478,12 @@ func (c *conn) mux() {
default:
session, ok = sessionsByRemoteChannel[fr.Channel]
if !ok {
c.err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel", fr.Body, fr.Channel)
}
}
if !ok {
c.err = fmt.Errorf("unexpected frame: %#v", fr.Body)
continue
}
@ -845,7 +852,7 @@ func (c *conn) readProtoHeader() (protoHeader, error) {
case err := <-c.connErr:
return p, err
case fr := <-c.rxFrame:
return p, fmt.Errorf("unexpected frame %#v", fr)
return p, fmt.Errorf("readProtoHeader: unexpected frame %#v", fr)
case <-deadline:
return p, ErrTimeout
}
@ -917,7 +924,7 @@ func (c *conn) openAMQP() stateFunc {
}
o, ok := fr.Body.(*frames.PerformOpen)
if !ok {
c.err = fmt.Errorf("unexpected frame type %T", fr.Body)
c.err = fmt.Errorf("openAMQP: unexpected frame type %T", fr.Body)
return nil
}
debug(1, "RX: %s", o)
@ -949,7 +956,7 @@ func (c *conn) negotiateSASL() stateFunc {
}
sm, ok := fr.Body.(*frames.SASLMechanisms)
if !ok {
c.err = fmt.Errorf("unexpected frame type %T", fr.Body)
c.err = fmt.Errorf("negotiateSASL: unexpected frame type %T", fr.Body)
return nil
}
debug(1, "RX: %s", sm)
@ -981,7 +988,7 @@ func (c *conn) saslOutcome() stateFunc {
}
so, ok := fr.Body.(*frames.SASLOutcome)
if !ok {
c.err = fmt.Errorf("unexpected frame type %T", fr.Body)
c.err = fmt.Errorf("saslOutcome: unexpected frame type %T", fr.Body)
return nil
}
debug(1, "RX: %s", so)

Просмотреть файл

@ -459,7 +459,7 @@ func TestKeepAlives(t *testing.T) {
return []byte{'A', 'M', 'Q', 'P', 0, 1, 0, 0}, nil
case *frames.PerformOpen:
// specify small idle timeout so we receive a lot of keep-alives
return mocks.EncodeFrame(mocks.FrameAMQP, &frames.PerformOpen{ContainerID: "container", IdleTimeout: 1 * time.Millisecond})
return mocks.EncodeFrame(mocks.FrameAMQP, 0, &frames.PerformOpen{ContainerID: "container", IdleTimeout: 1 * time.Millisecond})
case *mocks.KeepAlive:
keepAlives++
return nil, nil

Просмотреть файл

@ -1,6 +1,11 @@
package amqp
import "github.com/Azure/go-amqp/internal/encoding"
import (
"fmt"
"time"
"github.com/Azure/go-amqp/internal/encoding"
)
// Error Conditions
const (
@ -41,3 +46,21 @@ const (
type Error = encoding.Error
type ErrorCondition = encoding.ErrorCondition
// DetachError is returned by a link (Receiver/Sender) when a detach frame is received.
//
// RemoteError will be nil if the link was detached gracefully.
type DetachError struct {
RemoteError *Error
}
func (e *DetachError) Error() string {
return fmt.Sprintf("link detached, reason: %+v", e.RemoteError)
}
// Default link options
const (
DefaultLinkCredit = 1
DefaultLinkBatching = false
DefaultLinkBatchMaxAge = 5 * time.Second
)

Просмотреть файл

@ -173,13 +173,19 @@ func ProtoHeader(id ProtoID) ([]byte, error) {
// PerformOpen appends a PerformOpen frame with the specified container ID.
// This frame, and ProtoHeader, are needed when calling amqp.New() to create a client.
func PerformOpen(containerID string) ([]byte, error) {
return EncodeFrame(FrameAMQP, &frames.PerformOpen{ContainerID: containerID})
// send the default values for max channels and frame size
return EncodeFrame(FrameAMQP, 0, &frames.PerformOpen{
ChannelMax: 65535,
ContainerID: containerID,
IdleTimeout: time.Minute,
MaxFrameSize: 4294967295,
})
}
// PerformBegin appends a PerformBegin frame with the specified remote channel ID.
// This frame is needed when making a call to Client.NewSession().
func PerformBegin(remoteChannel uint16) ([]byte, error) {
return EncodeFrame(FrameAMQP, &frames.PerformBegin{
return EncodeFrame(FrameAMQP, remoteChannel, &frames.PerformBegin{
RemoteChannel: &remoteChannel,
NextOutgoingID: 1,
IncomingWindow: 5000,
@ -190,8 +196,8 @@ func PerformBegin(remoteChannel uint16) ([]byte, error) {
// ReceiverAttach appends a PerformAttach frame with the specified values.
// This frame is needed when making a call to Session.NewReceiver().
func ReceiverAttach(linkName string, linkHandle uint32, mode encoding.ReceiverSettleMode) ([]byte, error) {
return EncodeFrame(FrameAMQP, &frames.PerformAttach{
func ReceiverAttach(remoteChannel uint16, linkName string, linkHandle uint32, mode encoding.ReceiverSettleMode) ([]byte, error) {
return EncodeFrame(FrameAMQP, remoteChannel, &frames.PerformAttach{
Name: linkName,
Handle: linkHandle,
Role: encoding.RoleSender,
@ -207,7 +213,7 @@ func ReceiverAttach(linkName string, linkHandle uint32, mode encoding.ReceiverSe
// PerformTransfer appends a PerformTransfer frame with the specified values.
// The linkHandle MUST match the linkHandle value specified in ReceiverAttach.
func PerformTransfer(linkHandle, deliveryID uint32, payload []byte) ([]byte, error) {
func PerformTransfer(remoteChannel uint16, linkHandle, deliveryID uint32, payload []byte) ([]byte, error) {
format := uint32(0)
payloadBuf := &buffer.Buffer{}
encoding.WriteDescriptor(payloadBuf, encoding.TypeCodeApplicationData)
@ -215,7 +221,7 @@ func PerformTransfer(linkHandle, deliveryID uint32, payload []byte) ([]byte, err
if err != nil {
return nil, err
}
return EncodeFrame(FrameAMQP, &frames.PerformTransfer{
return EncodeFrame(FrameAMQP, remoteChannel, &frames.PerformTransfer{
Handle: linkHandle,
DeliveryID: &deliveryID,
DeliveryTag: []byte("tag"),
@ -226,8 +232,8 @@ func PerformTransfer(linkHandle, deliveryID uint32, payload []byte) ([]byte, err
// PerformDisposition appends a PerformDisposition frame with the specified values.
// The deliveryID MUST match the deliveryID value specified in PerformTransfer.
func PerformDisposition(deliveryID uint32, state encoding.DeliveryState) ([]byte, error) {
return EncodeFrame(FrameAMQP, &frames.PerformDisposition{
func PerformDisposition(remoteChannel uint16, deliveryID uint32, state encoding.DeliveryState) ([]byte, error) {
return EncodeFrame(FrameAMQP, remoteChannel, &frames.PerformDisposition{
Role: encoding.RoleSender,
First: deliveryID,
Settled: true,
@ -235,9 +241,14 @@ func PerformDisposition(deliveryID uint32, state encoding.DeliveryState) ([]byte
})
}
// PerformEnd encodes a PerformEnd frame with an optional error.
func PerformEnd(remoteChannel uint16, e *encoding.Error) ([]byte, error) {
return EncodeFrame(FrameAMQP, remoteChannel, &frames.PerformEnd{Error: e})
}
// PerformClose encodes a PerformClose frame with an optional error.
func PerformClose(e *encoding.Error) ([]byte, error) {
return EncodeFrame(FrameAMQP, &frames.PerformClose{Error: e})
return EncodeFrame(FrameAMQP, 0, &frames.PerformClose{Error: e})
}
// AMQPProto is the frame type passed to FrameCallback() for the initial protocal handshake.
@ -268,7 +279,7 @@ const (
)
// EncodeFrame encodes the specified frame to be sent over the wire.
func EncodeFrame(t FrameType, f frames.FrameBody) ([]byte, error) {
func EncodeFrame(t FrameType, remoteChannel uint16, f frames.FrameBody) ([]byte, error) {
bodyBuf := buffer.New([]byte{})
if err := encoding.Marshal(bodyBuf, f); err != nil {
return nil, err
@ -278,6 +289,7 @@ func EncodeFrame(t FrameType, f frames.FrameBody) ([]byte, error) {
Size: uint32(bodyBuf.Len()) + 8,
DataOffset: 2,
FrameType: uint8(t),
Channel: remoteChannel,
}
headerBuf := buffer.New([]byte{})
if err := encoding.Marshal(headerBuf, header); err != nil {

Просмотреть файл

@ -742,8 +742,8 @@ func (l *link) muxHandleFrame(fr frames.FrameBody) error {
_ = l.Session.txFrame(resp, nil)
default:
debug(1, "RX: %s", fr)
fmt.Printf("Unexpected frame: %s\n", fr)
// TODO: evaluate
debug(1, "muxHandleFrame: unexpected frame: %s\n", fr)
}
return nil

Просмотреть файл

@ -2,11 +2,13 @@ package amqp
import (
"context"
"encoding/binary"
"fmt"
"sync"
"testing"
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/stretchr/testify/require"
)
@ -212,3 +214,105 @@ func newTestLink(t *testing.T) *link {
return l
}
func TestLinkOptions(t *testing.T) {
tests := []struct {
label string
opts []LinkOption
wantSource *frames.Source
wantProperties map[encoding.Symbol]interface{}
}{
{
label: "no options",
},
{
label: "link-filters",
opts: []LinkOption{
LinkSelectorFilter("amqp.annotation.x-opt-offset > '100'"),
LinkProperty("x-opt-test1", "test1"),
LinkProperty("x-opt-test2", "test2"),
LinkProperty("x-opt-test1", "test3"),
LinkPropertyInt64("x-opt-test4", 1),
LinkPropertyInt32("x-opt-test5", 2),
LinkSourceFilter("com.microsoft:session-filter", 0x00000137000000C, "123"),
},
wantSource: &frames.Source{
Filter: map[encoding.Symbol]*encoding.DescribedType{
"apache.org:selector-filter:string": {
Descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04}),
Value: "amqp.annotation.x-opt-offset > '100'",
},
"com.microsoft:session-filter": {
Descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x00, 0x13, 0x70, 0x00, 0x00, 0x0C}),
Value: "123",
},
},
},
wantProperties: map[encoding.Symbol]interface{}{
"x-opt-test1": "test3",
"x-opt-test2": "test2",
"x-opt-test4": int64(1),
"x-opt-test5": int32(2),
},
},
{
label: "more-link-filters",
opts: []LinkOption{
LinkSourceFilter("com.microsoft:session-filter", 0x00000137000000C, nil),
},
wantSource: &frames.Source{
Filter: map[encoding.Symbol]*encoding.DescribedType{
"com.microsoft:session-filter": {
Descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x00, 0x13, 0x70, 0x00, 0x00, 0x0C}),
Value: nil,
},
},
},
},
{
label: "link-source-capabilities",
opts: []LinkOption{
LinkSourceCapabilities("cap1", "cap2", "cap3"),
},
wantSource: &frames.Source{
Capabilities: []encoding.Symbol{"cap1", "cap2", "cap3"},
},
},
}
for _, tt := range tests {
t.Run(tt.label, func(t *testing.T) {
got, err := newLink(nil, nil, tt.opts)
if err != nil {
t.Fatal(err)
}
if !testEqual(got.Source, tt.wantSource) {
t.Errorf("Source properties don't match expected:\n %s", testDiff(got.Source, tt.wantSource))
}
if !testEqual(got.properties, tt.wantProperties) {
t.Errorf("Link properties don't match expected:\n %s", testDiff(got.properties, tt.wantProperties))
}
})
}
}
func TestSourceName(t *testing.T) {
expectedSourceName := "source-name"
opts := []LinkOption{
LinkName(expectedSourceName),
}
got, err := newLink(nil, nil, opts)
if err != nil {
t.Fatal(err)
}
if got.Key.name != expectedSourceName {
t.Errorf("Link Source Name does not match expected: %v got: %v", expectedSourceName, got.Key.name)
}
}

Просмотреть файл

@ -46,16 +46,16 @@ func TestReceive_ModeFirst(t *testing.T) {
case *frames.PerformBegin:
return mocks.PerformBegin(0)
case *frames.PerformAttach:
return mocks.ReceiverAttach(linkName, linkHandle, ModeFirst)
return mocks.ReceiverAttach(0, linkName, linkHandle, ModeFirst)
case *frames.PerformFlow:
if *ff.NextIncomingID == deliveryID {
// this is the first flow frame, send our payload
return mocks.PerformTransfer(linkHandle, deliveryID, []byte("hello"))
return mocks.PerformTransfer(0, linkHandle, deliveryID, []byte("hello"))
}
// ignore future flow frames as we have no response
return nil, nil
case *frames.PerformDisposition:
return mocks.PerformDisposition(deliveryID, &encoding.StateAccepted{})
return mocks.PerformDisposition(0, deliveryID, &encoding.StateAccepted{})
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}
@ -101,16 +101,16 @@ func TestReceive_ModeSecond(t *testing.T) {
case *frames.PerformBegin:
return mocks.PerformBegin(0)
case *frames.PerformAttach:
return mocks.ReceiverAttach(linkName, linkHandle, ModeSecond)
return mocks.ReceiverAttach(0, linkName, linkHandle, ModeSecond)
case *frames.PerformFlow:
if *ff.NextIncomingID == deliveryID {
// this is the first flow frame, send our payload
return mocks.PerformTransfer(linkHandle, deliveryID, []byte("hello"))
return mocks.PerformTransfer(0, linkHandle, deliveryID, []byte("hello"))
}
// ignore future flow frames as we have no response
return nil, nil
case *frames.PerformDisposition:
return mocks.PerformDisposition(deliveryID, &encoding.StateAccepted{})
return mocks.PerformDisposition(0, deliveryID, &encoding.StateAccepted{})
default:
return nil, fmt.Errorf("unhandled frame %T", req)
}

Просмотреть файл

@ -216,7 +216,7 @@ func (s saslXOAUTH2Handler) step() stateFunc {
return nil
}
default:
s.conn.err = fmt.Errorf("unexpected frame type %T", fr.Body)
s.conn.err = fmt.Errorf("sasl: unexpected frame type %T", fr.Body)
return nil
}
}

Просмотреть файл

@ -394,7 +394,8 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
return
default:
fmt.Printf("Unexpected frame: %s\n", body)
// TODO: evaluate
debug(1, "session mux: unexpected frame: %s\n", body)
}
case fr := <-txTransfer: