Replace variadic config for conn with struct config (#160)

* Replace variadic config for conn with struct config

Added ConnOptions type which replicates the previous functional options
with the exception of ConnTLS which is redundant.
The SASL authentication mechanism is now singular instead of last one
wins.
Added SASLType for configuring SALS configuration.  The matching
configuration funcs have been renamed to match this type.

* fix linter issue

* update comment

* fix doc comments
This commit is contained in:
Joel Hendrix 2022-06-24 14:30:04 -07:00 коммит произвёл GitHub
Родитель e8e9f4581b
Коммит 69e69a86f1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 285 добавлений и 325 удалений

Просмотреть файл

@ -5,6 +5,7 @@
### Features Added
* Added `ConnectionError` type that's returned when a connection is no longer functional.
* Added `LinkTargetCapabilities()` option to specify desired target capabilities.
* Added `SASLType` used when configuring the SASL authentication mechanism.
### Breaking Changes
* Removed `ErrConnClosed` and `ErrTimeout` sentinel error types.
@ -15,6 +16,11 @@
* `AMQPAddress`, `AMQPMessageID`, `AMQPSymbol`, `AMQPSequenceNumber`, `AMQPBinary`
* Various `Default*` constants are no longer exported.
* The args to `Receiver.ModifyMessage()` have changed.
* The "variadic config" pattern for `Client` constructors has been replaced with a struct-based config.
* This removes the `ConnOption` type and all of the associated configuration funcs.
* The `ConnTLS()` option was removed as part of this change.
* The `Dial()` and `New()` constructors now require an `*ConnOptions` parameter.
* The various SASL configuration funcs have been slightly renamed.
### Bugs Fixed
* Fixed potential panic in `muxHandleFrame()` when checking for manual creditor.

Просмотреть файл

@ -14,7 +14,7 @@ func BenchmarkSimple(b *testing.B) {
if localBrokerAddr == "" {
b.Skip()
}
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
b.Fatal(err)
}

Просмотреть файл

@ -27,8 +27,10 @@ type Client struct {
//
// If username and password information is not empty it's used as SASL PLAIN
// credentials, equal to passing ConnSASLPlain option.
func Dial(addr string, opts ...ConnOption) (*Client, error) {
c, err := dialConn(addr, opts...)
//
// opts: pass nil to accept the default values.
func Dial(addr string, opts *ConnOptions) (*Client, error) {
c, err := dialConn(addr, opts)
if err != nil {
return nil, err
}
@ -40,8 +42,9 @@ func Dial(addr string, opts ...ConnOption) (*Client, error) {
}
// New establishes an AMQP client connection over conn.
func New(conn net.Conn, opts ...ConnOption) (*Client, error) {
c, err := newConn(conn, opts...)
// opts: pass nil to accept the default values.
func New(conn net.Conn, opts *ConnOptions) (*Client, error) {
c, err := newConn(conn, opts)
if err != nil {
return nil, err
}

Просмотреть файл

@ -38,7 +38,7 @@ func TestClientDial(t *testing.T) {
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
client, err := Dial("amqp://localhost", connDialer(mockDialer{resp: responder}))
client, err := Dial("amqp://localhost", &ConnOptions{dialer: mockDialer{resp: responder}})
require.NoError(t, err)
require.NotNil(t, client)
// error case
@ -52,7 +52,7 @@ func TestClientDial(t *testing.T) {
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
client, err = Dial("amqp://localhost", connDialer(mockDialer{resp: responder}))
client, err = Dial("amqp://localhost", &ConnOptions{dialer: mockDialer{resp: responder}})
require.Error(t, err)
require.Nil(t, client)
}
@ -68,7 +68,7 @@ func TestClientClose(t *testing.T) {
return nil, fmt.Errorf("unhandled frame %T", req)
}
}
client, err := Dial("amqp://localhost", connDialer(mockDialer{resp: responder}))
client, err := Dial("amqp://localhost", &ConnOptions{dialer: mockDialer{resp: responder}})
require.NoError(t, err)
require.NotNil(t, client)
require.NoError(t, client.Close())
@ -167,7 +167,7 @@ func TestClientNewSession(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx, SessionIncomingWindow(incomingWindow), SessionOutgoingWindow(outgoingWindow))
@ -206,7 +206,7 @@ func TestClientMultipleSessions(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
// first session
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -249,7 +249,7 @@ func TestClientTooManySessions(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
for i := uint16(0); i < 3; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -270,7 +270,7 @@ func TestClientTooManySessions(t *testing.T) {
func TestClientNewSessionInvalidOption(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx, SessionMaxLinks(0))
@ -301,7 +301,7 @@ func TestClientNewSessionMissingRemoteChannel(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx, SessionMaxLinks(1))
@ -327,7 +327,7 @@ func TestClientNewSessionInvalidInitialResponse(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -360,7 +360,7 @@ func TestClientNewSessionInvalidSecondResponseSameChannel(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
// fisrt session succeeds
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -401,7 +401,7 @@ func TestClientNewSessionInvalidSecondResponseDifferentChannel(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
// fisrt session succeeds
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

239
conn.go
Просмотреть файл

@ -26,132 +26,63 @@ const (
defaultMaxSessions = 65536
)
// ConnOption is a function for configuring an AMQP connection.
type ConnOption func(*conn) error
// ConnOptions contains the optional settings for configuring an AMQP connection.
type ConnOptions struct {
// ContainerID sets the container-id to use when opening the connection.
//
// A container ID will be randomly generated if this option is not used.
ContainerID string
// ConnServerHostname sets the hostname sent in the AMQP
// Open frame and TLS ServerName (if not otherwise set).
//
// This is useful when the AMQP connection will be established
// via a pre-established TLS connection as the server may not
// know which hostname the client is attempting to connect to.
func ConnServerHostname(hostname string) ConnOption {
return func(c *conn) error {
c.hostname = hostname
return nil
}
}
// HostName sets the hostname sent in the AMQP
// Open frame and TLS ServerName (if not otherwise set).
HostName string
// ConnTLS toggles TLS negotiation.
//
// Default: false.
func ConnTLS(enable bool) ConnOption {
return func(c *conn) error {
c.tlsNegotiation = enable
return nil
}
}
// IdleTimeout specifies the maximum period in milliseconds between
// receiving frames from the peer.
//
// Specify a value less than zero to disable idle timeout.
//
// Default: 1 minute.
IdleTimeout time.Duration
// ConnTLSConfig sets the tls.Config to be used during
// TLS negotiation.
//
// This option is for advanced usage, in most scenarios
// providing a URL scheme of "amqps://" or ConnTLS(true)
// is sufficient.
func ConnTLSConfig(tc *tls.Config) ConnOption {
return func(c *conn) error {
c.tlsConfig = tc
c.tlsNegotiation = true
return nil
}
}
// MaxFrameSize sets the maximum frame size that
// the connection will accept.
//
// Must be 512 or greater.
//
// Default: 512.
MaxFrameSize uint32
// ConnIdleTimeout specifies the maximum period between receiving
// frames from the peer.
//
// Resolution is milliseconds. A value of zero indicates no timeout.
// This setting is in addition to TCP keepalives.
//
// Default: 1 minute.
func ConnIdleTimeout(d time.Duration) ConnOption {
return func(c *conn) error {
if d < 0 {
return errors.New("idle timeout cannot be negative")
}
c.idleTimeout = d
return nil
}
}
// MaxSessions sets the maximum number of channels.
// The value must be greater than zero.
//
// Default: 65535.
MaxSessions uint16
// ConnMaxFrameSize sets the maximum frame size that
// the connection will accept.
//
// Must be 512 or greater.
//
// Default: 512.
func ConnMaxFrameSize(n uint32) ConnOption {
return func(c *conn) error {
if n < 512 {
return errors.New("max frame size must be 512 or greater")
}
c.maxFrameSize = n
return nil
}
}
// Properties sets an entry in the connection properties map sent to the server.
Properties map[string]interface{}
// ConnConnectTimeout configures how long to wait for the
// server during connection establishment.
//
// Once the connection has been established, ConnIdleTimeout
// applies. If duration is zero, no timeout will be applied.
//
// Default: 0.
func ConnConnectTimeout(d time.Duration) ConnOption {
return func(c *conn) error { c.connectTimeout = d; return nil }
}
// SASLType contains the specified SASL authentication mechanism.
SASLType SASLType
// ConnMaxSessions sets the maximum number of channels.
//
// n must be in the range 1 to 65536.
//
// Default: 65536.
func ConnMaxSessions(n int) ConnOption {
return func(c *conn) error {
if n < 1 {
return errors.New("max sessions cannot be less than 1")
}
if n > 65536 {
return errors.New("max sessions cannot be greater than 65536")
}
c.channelMax = uint16(n - 1)
return nil
}
}
// Timeout configures how long to wait for the
// server during connection establishment.
//
// Once the connection has been established, IdleTimeout
// applies. If duration is zero, no timeout will be applied.
//
// Default: 0.
Timeout time.Duration
// ConnProperty sets an entry in the connection properties map sent to the server.
//
// This option can be used multiple times.
func ConnProperty(key, value string) ConnOption {
return func(c *conn) error {
if key == "" {
return errors.New("connection property key must not be empty")
}
if c.properties == nil {
c.properties = make(map[encoding.Symbol]interface{})
}
c.properties[encoding.Symbol(key)] = value
return nil
}
}
// TLSConfig sets the tls.Config to be used during
// TLS negotiation.
//
// This option is for advanced usage, in most scenarios
// providing a URL scheme of "amqps://" is sufficient.
TLSConfig *tls.Config
// ConnContainerID sets the container-id to use when opening the connection.
//
// A container ID will be randomly generated if this option is not used.
func ConnContainerID(id string) ConnOption {
return func(c *conn) error {
c.containerID = id
return nil
}
// test hook
dialer dialer
}
// used to abstract the underlying dialer for testing purposes
@ -160,13 +91,6 @@ type dialer interface {
TLSDialWithDialer(c *conn, host, port string) error
}
func connDialer(d dialer) ConnOption {
return func(c *conn) error {
c.dialer = d
return nil
}
}
// conn is an AMQP connection.
// only exported fields and methods are part of public surface area,
// all others are considered to be internal implementation details.
@ -240,7 +164,7 @@ func (defaultDialer) TLSDialWithDialer(c *conn, host, port string) (err error) {
return
}
func dialConn(addr string, opts ...ConnOption) (*conn, error) {
func dialConn(addr string, opts *ConnOptions) (*conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
@ -253,21 +177,22 @@ func dialConn(addr string, opts ...ConnOption) (*conn, error) {
}
}
var cp ConnOptions
if opts != nil {
cp = *opts
}
// prepend SASL credentials when the user/pass segment is not empty
if u.User != nil {
pass, _ := u.User.Password()
opts = append([]ConnOption{
ConnSASLPlain(u.User.Username(), pass),
}, opts...)
cp.SASLType = SASLTypePlain(u.User.Username(), pass)
}
// append default options so user specified can overwrite
opts = append([]ConnOption{
connDialer(defaultDialer{}),
ConnServerHostname(host),
}, opts...)
if cp.HostName == "" {
cp.HostName = host
}
c, err := newConn(nil, opts...)
c, err := newConn(nil, &cp)
if err != nil {
return nil, err
}
@ -289,8 +214,9 @@ func dialConn(addr string, opts ...ConnOption) (*conn, error) {
return c, nil
}
func newConn(netConn net.Conn, opts ...ConnOption) (*conn, error) {
func newConn(netConn net.Conn, opts *ConnOptions) (*conn, error) {
c := &conn{
dialer: defaultDialer{},
net: netConn,
maxFrameSize: defaultMaxFrameSize,
PeerMaxFrameSize: defaultMaxFrameSize,
@ -311,11 +237,50 @@ func newConn(netConn net.Conn, opts ...ConnOption) (*conn, error) {
}
// apply options
for _, opt := range opts {
if err := opt(c); err != nil {
if opts == nil {
opts = &ConnOptions{}
}
if opts.ContainerID != "" {
c.containerID = opts.ContainerID
}
if opts.HostName != "" {
c.hostname = opts.HostName
}
if opts.IdleTimeout > 0 {
c.idleTimeout = opts.IdleTimeout
} else if opts.IdleTimeout < 0 {
c.idleTimeout = 0
}
if opts.MaxFrameSize > 0 && opts.MaxFrameSize < 512 {
return nil, fmt.Errorf("invalid MaxFrameSize value %d", opts.MaxFrameSize)
} else if opts.MaxFrameSize > 512 {
c.maxFrameSize = opts.MaxFrameSize
}
if opts.MaxSessions > 0 {
c.channelMax = opts.MaxSessions
}
if opts.SASLType != nil {
if err := opts.SASLType(c); err != nil {
return nil, err
}
}
if opts.Timeout > 0 {
c.connectTimeout = opts.Timeout
}
if opts.Properties != nil {
c.properties = make(map[encoding.Symbol]interface{})
for key, val := range opts.Properties {
c.properties[encoding.Symbol(key)] = val
}
}
if opts.TLSConfig != nil {
c.tlsConfig = opts.TLSConfig.Clone()
}
if opts.dialer != nil {
c.dialer = opts.dialer
}
return c, nil
}

Просмотреть файл

@ -17,7 +17,7 @@ import (
func TestConnOptions(t *testing.T) {
tests := []struct {
label string
opts []ConnOption
opts ConnOptions
verify func(t *testing.T, c *conn)
fails bool
}{
@ -27,10 +27,11 @@ func TestConnOptions(t *testing.T) {
},
{
label: "multiple properties",
opts: []ConnOption{
ConnProperty("x-opt-test1", "test1"),
ConnProperty("x-opt-test2", "test2"),
ConnProperty("x-opt-test1", "test3"),
opts: ConnOptions{
Properties: map[string]interface{}{
"x-opt-test1": "test3",
"x-opt-test2": "test2",
},
},
verify: func(t *testing.T, c *conn) {
wantProperties := map[encoding.Symbol]interface{}{
@ -38,14 +39,14 @@ func TestConnOptions(t *testing.T) {
"x-opt-test2": "test2",
}
if !testEqual(c.properties, wantProperties) {
t.Errorf("Properties don't match expected:\n %s", testDiff(c.properties, wantProperties))
require.Equal(t, wantProperties, c.properties)
}
},
},
{
label: "ConnServerHostname",
opts: []ConnOption{
ConnServerHostname("testhost"),
opts: ConnOptions{
HostName: "testhost",
},
verify: func(t *testing.T, c *conn) {
if c.hostname != "testhost" {
@ -53,21 +54,10 @@ func TestConnOptions(t *testing.T) {
}
},
},
{
label: "ConnTLS",
opts: []ConnOption{
ConnTLS(true),
},
verify: func(t *testing.T, c *conn) {
if !c.tlsNegotiation {
t.Error("expected TLS enabled")
}
},
},
{
label: "ConnTLSConfig",
opts: []ConnOption{
ConnTLSConfig(&tls.Config{MinVersion: tls.VersionTLS13}),
opts: ConnOptions{
TLSConfig: &tls.Config{MinVersion: tls.VersionTLS13},
},
verify: func(t *testing.T, c *conn) {
if c.tlsConfig.MinVersion != tls.VersionTLS13 {
@ -77,8 +67,8 @@ func TestConnOptions(t *testing.T) {
},
{
label: "ConnIdleTimeout_Valid",
opts: []ConnOption{
ConnIdleTimeout(15 * time.Minute),
opts: ConnOptions{
IdleTimeout: 15 * time.Minute,
},
verify: func(t *testing.T, c *conn) {
if c.idleTimeout != 15*time.Minute {
@ -89,14 +79,14 @@ func TestConnOptions(t *testing.T) {
{
label: "ConnIdleTimeout_Invalid",
fails: true,
opts: []ConnOption{
ConnIdleTimeout(-15 * time.Minute),
opts: ConnOptions{
IdleTimeout: -15 * time.Minute,
},
},
{
label: "ConnMaxFrameSize_Valid",
opts: []ConnOption{
ConnMaxFrameSize(1024),
opts: ConnOptions{
MaxFrameSize: 1024,
},
verify: func(t *testing.T, c *conn) {
if c.maxFrameSize != 1024 {
@ -107,14 +97,14 @@ func TestConnOptions(t *testing.T) {
{
label: "ConnMaxFrameSize_Invalid",
fails: true,
opts: []ConnOption{
ConnMaxFrameSize(128),
opts: ConnOptions{
MaxFrameSize: 128,
},
},
{
label: "ConnConnectTimeout",
opts: []ConnOption{
ConnConnectTimeout(5 * time.Minute),
opts: ConnOptions{
Timeout: 5 * time.Minute,
},
verify: func(t *testing.T, c *conn) {
if c.connectTimeout != 5*time.Minute {
@ -124,11 +114,11 @@ func TestConnOptions(t *testing.T) {
},
{
label: "ConnMaxSessions_Success",
opts: []ConnOption{
ConnMaxSessions(32768),
opts: ConnOptions{
MaxSessions: 32768,
},
verify: func(t *testing.T, c *conn) {
if c.channelMax != 32768-1 { // zero-based
if c.channelMax != 32768 {
t.Errorf("unexpected session count %d", c.channelMax)
}
},
@ -136,28 +126,14 @@ func TestConnOptions(t *testing.T) {
{
label: "ConnMaxSessions_TooSmall",
fails: true,
opts: []ConnOption{
ConnMaxSessions(0),
},
},
{
label: "ConnMaxSessions_TooBig",
fails: true,
opts: []ConnOption{
ConnMaxSessions(70000),
},
},
{
label: "ConnProperty_Invalid",
fails: true,
opts: []ConnOption{
ConnProperty("", "value"),
opts: ConnOptions{
MaxSessions: 0,
},
},
{
label: "ConnContainerID",
opts: []ConnOption{
ConnContainerID("myid"),
opts: ConnOptions{
ContainerID: "myid",
},
verify: func(t *testing.T, c *conn) {
if c.containerID != "myid" {
@ -169,7 +145,7 @@ func TestConnOptions(t *testing.T) {
for _, tt := range tests {
t.Run(tt.label, func(t *testing.T) {
got, err := newConn(nil, tt.opts...)
got, err := newConn(nil, &tt.opts)
if err != nil && !tt.fails {
t.Fatal(err)
}
@ -202,30 +178,30 @@ func (f fakeDialer) error() error {
}
func TestDialConn(t *testing.T) {
c, err := dialConn(":bad url/ value", connDialer(fakeDialer{}))
c, err := dialConn(":bad url/ value", &ConnOptions{dialer: fakeDialer{}})
require.Error(t, err)
require.Nil(t, c)
c, err = dialConn("http://localhost", connDialer(fakeDialer{}))
c, err = dialConn("http://localhost", &ConnOptions{dialer: fakeDialer{}})
require.Error(t, err)
require.Nil(t, c)
c, err = dialConn("amqp://localhost", connDialer(fakeDialer{}))
c, err = dialConn("amqp://localhost", &ConnOptions{dialer: fakeDialer{}})
require.NoError(t, err)
require.NotNil(t, c)
require.Nil(t, c.tlsConfig)
c, err = dialConn("amqps://localhost", connDialer(fakeDialer{}))
c, err = dialConn("amqps://localhost", &ConnOptions{dialer: fakeDialer{}})
require.NoError(t, err)
require.NotNil(t, c)
require.NotNil(t, c.tlsConfig)
c, err = dialConn("amqp://localhost:12345", connDialer(fakeDialer{}))
c, err = dialConn("amqp://localhost:12345", &ConnOptions{dialer: fakeDialer{}})
require.NoError(t, err)
require.NotNil(t, c)
c, err = dialConn("amqp://username:password@localhost", connDialer(fakeDialer{}))
c, err = dialConn("amqp://username:password@localhost", &ConnOptions{dialer: fakeDialer{}})
require.NoError(t, err)
require.NotNil(t, c)
if _, ok := c.saslHandlers[saslMechanismPLAIN]; !ok {
t.Fatal("missing SASL plain handler")
}
c, err = dialConn("amqp://localhost", connDialer(fakeDialer{fail: true}))
c, err = dialConn("amqp://localhost", &ConnOptions{dialer: fakeDialer{fail: true}})
require.Error(t, err)
require.Nil(t, c)
}
@ -306,7 +282,7 @@ func TestStart(t *testing.T) {
for _, tt := range tests {
t.Run(tt.label, func(t *testing.T) {
netConn := mocks.NewNetConn(tt.responder)
conn, err := newConn(netConn)
conn, err := newConn(netConn, nil)
require.NoError(t, err)
err = conn.Start()
if tt.fails && err == nil {
@ -320,13 +296,13 @@ func TestStart(t *testing.T) {
func TestClose(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
conn, err := newConn(netConn)
conn, err := newConn(netConn, nil)
require.NoError(t, err)
require.NoError(t, conn.Start())
require.NoError(t, conn.Close())
// with Close error
netConn = mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
conn, err = newConn(netConn)
conn, err = newConn(netConn, nil)
require.NoError(t, err)
require.NoError(t, conn.Start())
netConn.OnClose = func() error {
@ -339,7 +315,7 @@ func TestClose(t *testing.T) {
func TestServerSideClose(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
conn, err := newConn(netConn)
conn, err := newConn(netConn, nil)
require.NoError(t, err)
require.NoError(t, conn.Start())
fr, err := mocks.PerformClose(nil)
@ -349,7 +325,7 @@ func TestServerSideClose(t *testing.T) {
require.NoError(t, err)
// with error
netConn = mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
conn, err = newConn(netConn)
conn, err = newConn(netConn, nil)
require.NoError(t, err)
require.NoError(t, conn.Start())
fr, err = mocks.PerformClose(&Error{Condition: "Close", Description: "mock server error"})
@ -386,7 +362,7 @@ func TestKeepAlives(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
conn, err := newConn(netConn)
conn, err := newConn(netConn, nil)
require.NoError(t, err)
require.NoError(t, conn.Start())
// send keepalive
@ -404,7 +380,7 @@ func TestKeepAlives(t *testing.T) {
func TestConnReaderError(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
conn, err := newConn(netConn)
conn, err := newConn(netConn, nil)
require.NoError(t, err)
require.NoError(t, conn.Start())
// trigger some kind of error
@ -420,7 +396,7 @@ func TestConnReaderError(t *testing.T) {
func TestConnWriterError(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
conn, err := newConn(netConn)
conn, err := newConn(netConn, nil)
require.NoError(t, err)
require.NoError(t, conn.Start())
// send a frame that our responder doesn't handle to simulate a conn.connWriter error

Просмотреть файл

@ -11,9 +11,9 @@ import (
func Example() {
// Create client
client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net",
amqp.ConnSASLPlain("access-key-name", "access-key"),
)
client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
})
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}

Просмотреть файл

@ -18,10 +18,10 @@ import (
func fuzzConn(data []byte) int {
// Receive
client, err := New(testconn.New(data),
ConnSASLPlain("listen", "3aCXZYFcuZA89xe6lZkfYJvOPnTGipA3ap7NvPruBhI="),
ConnIdleTimeout(10*time.Millisecond),
)
client, err := New(testconn.New(data), &ConnOptions{
IdleTimeout: 10 * time.Millisecond,
SASLType: SASLTypePlain("listen", "3aCXZYFcuZA89xe6lZkfYJvOPnTGipA3ap7NvPruBhI="),
})
if err != nil {
return 0
}
@ -54,10 +54,10 @@ func fuzzConn(data []byte) int {
s.Close(ctx)
// Send
client, err = New(testconn.New(data),
ConnSASLPlain("listen", "3aCXZYFcuZA89xe6lZkfYJvOPnTGipA3ap7NvPruBhI="),
ConnIdleTimeout(10*time.Millisecond),
)
client, err = New(testconn.New(data), &ConnOptions{
IdleTimeout: 10 * time.Millisecond,
SASLType: SASLTypePlain("listen", "3aCXZYFcuZA89xe6lZkfYJvOPnTGipA3ap7NvPruBhI="),
})
if err != nil {
return 0
}

Просмотреть файл

@ -50,7 +50,7 @@ func TestIntegrationRoundTrip(t *testing.T) {
}
tests := []struct {
label string
sessions int
sessions uint16
data []string
}{
{
@ -88,13 +88,15 @@ func TestIntegrationRoundTrip(t *testing.T) {
checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)
// Create client
client, err := amqp.Dial(localBrokerAddr, amqp.ConnMaxSessions(tt.sessions))
client, err := amqp.Dial(localBrokerAddr, &amqp.ConnOptions{
MaxSessions: tt.sessions,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()
for i := 0; i < tt.sessions; i++ {
for i := uint16(0); i < tt.sessions; i++ {
// Open a session
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -244,7 +246,7 @@ func TestIntegrationRoundTrip_Buffered(t *testing.T) {
checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}
@ -360,7 +362,7 @@ func TestIntegrationReceiverModeSecond(t *testing.T) {
checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}
@ -514,7 +516,7 @@ func TestIntegrationSessionHandleMax(t *testing.T) {
// checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}
@ -586,7 +588,7 @@ func TestIntegrationLinkName(t *testing.T) {
label := fmt.Sprintf("name %v", tt.name)
t.Run(label, func(t *testing.T) {
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}
@ -646,7 +648,7 @@ func TestIntegrationClose(t *testing.T) {
checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}
@ -692,7 +694,7 @@ func TestIntegrationClose(t *testing.T) {
checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}
@ -741,7 +743,7 @@ func TestIntegrationClose(t *testing.T) {
checkLeaks := leaktest.CheckTimeout(t, 60*time.Second)
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}
@ -793,7 +795,7 @@ func TestMultipleSessionsOpenClose(t *testing.T) {
//checkLeaks := leaktest.Check(t)
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}
@ -831,7 +833,7 @@ func TestConcurrentSessionsOpenClose(t *testing.T) {
//checkLeaks := leaktest.Check(t)
// Create client
client, err := amqp.Dial(localBrokerAddr)
client, err := amqp.Dial(localBrokerAddr, nil)
if err != nil {
t.Fatal(err)
}

Просмотреть файл

@ -346,7 +346,7 @@ func TestSessionFlowDisablesTransfer(t *testing.T) {
nextIncomingID := uint32(0)
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
@ -374,7 +374,7 @@ func TestSessionFlowDisablesTransfer(t *testing.T) {
func TestExactlyOnceDoesntWork(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)

Просмотреть файл

@ -16,7 +16,7 @@ import (
func TestReceiverInvalidOptions(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -63,7 +63,7 @@ func TestReceiverMethodsNoReceive(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -91,7 +91,7 @@ func TestReceiverMethodsNoReceive(t *testing.T) {
func TestReceiverLinkSourceFilter(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -113,7 +113,7 @@ func TestReceiverLinkSourceFilter(t *testing.T) {
func TestReceiverOnClosed(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -144,7 +144,7 @@ func TestReceiverOnClosed(t *testing.T) {
func TestReceiverOnSessionClosed(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -175,7 +175,7 @@ func TestReceiverOnSessionClosed(t *testing.T) {
func TestReceiverOnConnClosed(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -206,7 +206,7 @@ func TestReceiverOnConnClosed(t *testing.T) {
func TestReceiverOnDetached(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -263,7 +263,7 @@ func TestReceiveInvalidMessage(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -385,7 +385,7 @@ func TestReceiveSuccessModeFirst(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -442,7 +442,7 @@ func TestReceiveSuccessModeSecondAccept(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -520,7 +520,7 @@ func TestReceiveSuccessModeSecondReject(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -592,7 +592,7 @@ func TestReceiveSuccessModeSecondRelease(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -669,7 +669,7 @@ func TestReceiveSuccessModeSecondModify(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -771,7 +771,7 @@ func TestReceiveMultiFrameMessageSuccess(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -848,7 +848,7 @@ func TestReceiveInvalidMultiFrameMessage(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -958,7 +958,7 @@ func TestReceiveMultiFrameMessageAborted(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -1017,7 +1017,7 @@ func TestReceiveMessageTooBig(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -1060,7 +1060,7 @@ func TestReceiveSuccessAcceptFails(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -1123,7 +1123,7 @@ func TestReceiverDispositionBatcherTimer(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -1184,7 +1184,7 @@ func TestReceiverDispositionBatcherFull(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -1253,7 +1253,7 @@ func TestReceiverDispositionBatcherRelease(t *testing.T) {
}
}
conn := mocks.NewNetConn(responder)
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -1299,7 +1299,7 @@ func TestReceiverDispositionBatcherRelease(t *testing.T) {
func TestReceiverCloseOnUnsettledWithPending(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -1330,7 +1330,7 @@ func TestReceiverCloseOnUnsettledWithPending(t *testing.T) {
func TestReceiverConnReaderError(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)
@ -1364,7 +1364,7 @@ func TestReceiverConnReaderError(t *testing.T) {
func TestReceiverConnWriterError(t *testing.T) {
conn := mocks.NewNetConn(receiverFrameHandlerNoUnhandled(ModeFirst))
client, err := New(conn)
client, err := New(conn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx)

11
sasl.go
Просмотреть файл

@ -21,11 +21,14 @@ const (
frameTypeSASL = 0x1
)
// SASLType represents a SASL configuration to use during authentication.
type SASLType func(c *conn) error
// ConnSASLPlain enables SASL PLAIN authentication for the connection.
//
// SASL PLAIN transmits credentials in plain text and should only be used
// on TLS/SSL enabled connection.
func ConnSASLPlain(username, password string) ConnOption {
func SASLTypePlain(username, password string) SASLType {
// TODO: how widely used is hostname? should it be supported
return func(c *conn) error {
// make handlers map if no other mechanism has
@ -58,7 +61,7 @@ func ConnSASLPlain(username, password string) ConnOption {
}
// ConnSASLAnonymous enables SASL ANONYMOUS authentication for the connection.
func ConnSASLAnonymous() ConnOption {
func SASLTypeAnonymous() SASLType {
return func(c *conn) error {
// make handlers map if no other mechanism has
if c.saslHandlers == nil {
@ -90,7 +93,7 @@ func ConnSASLAnonymous() ConnOption {
// ConnSASLExternal enables SASL EXTERNAL authentication for the connection.
// The value for resp is dependent on the type of authentication (empty string is common for TLS).
// See https://datatracker.ietf.org/doc/html/rfc4422#appendix-A for additional info.
func ConnSASLExternal(resp string) ConnOption {
func SASLTypeExternal(resp string) SASLType {
return func(c *conn) error {
// make handlers map if no other mechanism has
if c.saslHandlers == nil {
@ -129,7 +132,7 @@ func ConnSASLExternal(resp string) ConnOption {
//
// SASL XOAUTH2 transmits the bearer in plain text and should only be used
// on TLS/SSL enabled connection.
func ConnSASLXOAUTH2(username, bearer string, saslMaxFrameSizeOverride uint32) ConnOption {
func SASLTypeXOAUTH2(username, bearer string, saslMaxFrameSizeOverride uint32) SASLType {
return func(c *conn) error {
// make handlers map if no other mechanism has
if c.saslHandlers == nil {

Просмотреть файл

@ -103,9 +103,10 @@ func TestConnSASLXOAUTH2AuthSuccess(t *testing.T) {
}
c := testconn.New(buf)
client, err := New(c,
ConnSASLXOAUTH2("someuser@example.com", "ya29.vF9dft4qmTc2Nvb3RlckBhdHRhdmlzdGEuY29tCg", 512),
ConnIdleTimeout(10*time.Minute))
client, err := New(c, &ConnOptions{
IdleTimeout: 10 * time.Minute,
SASLType: SASLTypeXOAUTH2("someuser@example.com", "ya29.vF9dft4qmTc2Nvb3RlckBhdHRhdmlzdGEuY29tCg", 512),
})
if err != nil {
t.Fatal(err)
}
@ -132,9 +133,10 @@ func TestConnSASLXOAUTH2AuthFail(t *testing.T) {
}
c := testconn.New(buf)
client, err := New(c,
ConnSASLXOAUTH2("someuser@example.com", "ya29.vF9dft4qmTc2Nvb3RlckBhdHRhdmlzdGEuY29tCg", 512),
ConnIdleTimeout(10*time.Minute))
client, err := New(c, &ConnOptions{
IdleTimeout: 10 * time.Minute,
SASLType: SASLTypeXOAUTH2("someuser@example.com", "ya29.vF9dft4qmTc2Nvb3RlckBhdHRhdmlzdGEuY29tCg", 512),
})
if err != nil {
defer client.Close()
}
@ -171,9 +173,10 @@ func TestConnSASLXOAUTH2AuthFailWithErrorResponse(t *testing.T) {
}
c := testconn.New(buf)
client, err := New(c,
ConnSASLXOAUTH2("someuser@example.com", "ya29.vF9dft4qmTc2Nvb3RlckBhdHRhdmlzdGEuY29tCg", 512),
ConnIdleTimeout(10*time.Minute))
client, err := New(c, &ConnOptions{
IdleTimeout: 10 * time.Minute,
SASLType: SASLTypeXOAUTH2("someuser@example.com", "ya29.vF9dft4qmTc2Nvb3RlckBhdHRhdmlzdGEuY29tCg", 512),
})
if err != nil {
defer client.Close()
}
@ -210,9 +213,10 @@ func TestConnSASLXOAUTH2AuthFailsAdditionalErrorResponse(t *testing.T) {
}
c := testconn.New(buf)
client, err := New(c,
ConnSASLXOAUTH2("someuser@example.com", "ya29.vF9dft4qmTc2Nvb3RlckBhdHRhdmlzdGEuY29tCg", 512),
ConnIdleTimeout(10*time.Minute))
client, err := New(c, &ConnOptions{
IdleTimeout: 10 * time.Minute,
SASLType: SASLTypeXOAUTH2("someuser@example.com", "ya29.vF9dft4qmTc2Nvb3RlckBhdHRhdmlzdGEuY29tCg", 512),
})
if err != nil {
defer client.Close()
}
@ -250,9 +254,10 @@ func TestConnSASLExternal(t *testing.T) {
}
c := testconn.New(buf)
client, err := New(c,
ConnSASLExternal(""),
ConnIdleTimeout(10*time.Minute))
client, err := New(c, &ConnOptions{
IdleTimeout: 10 * time.Minute,
SASLType: SASLTypeExternal(""),
})
if err != nil {
t.Fatal(err)
}

Просмотреть файл

@ -17,7 +17,7 @@ import (
func TestSenderInvalidOptions(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -67,7 +67,7 @@ func TestSenderMethodsNoSend(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -104,7 +104,7 @@ func TestSenderMethodsNoSend(t *testing.T) {
func TestSenderSendOnClosed(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -130,7 +130,7 @@ func TestSenderSendOnClosed(t *testing.T) {
func TestSenderSendOnSessionClosed(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -156,7 +156,7 @@ func TestSenderSendOnSessionClosed(t *testing.T) {
func TestSenderSendOnConnClosed(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -182,7 +182,7 @@ func TestSenderSendOnConnClosed(t *testing.T) {
func TestSenderSendOnDetached(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -238,7 +238,7 @@ func TestSenderAttachError(t *testing.T) {
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -286,7 +286,7 @@ func TestSenderAttachError(t *testing.T) {
func TestSenderSendMismatchedModes(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -329,7 +329,7 @@ func TestSenderSendSuccess(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -374,7 +374,7 @@ func TestSenderSendSettled(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -415,7 +415,7 @@ func TestSenderSendRejected(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -480,7 +480,7 @@ func TestSenderSendRejectedNoDetach(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -529,7 +529,7 @@ func TestSenderSendDetached(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -558,7 +558,7 @@ func TestSenderSendDetached(t *testing.T) {
func TestSenderSendTimeout(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -613,7 +613,7 @@ func TestSenderSendMsgTooBig(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -649,7 +649,7 @@ func TestSenderSendTagTooBig(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -721,7 +721,7 @@ func TestSenderSendMultiTransfer(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -752,7 +752,7 @@ func TestSenderSendMultiTransfer(t *testing.T) {
func TestSenderConnReaderError(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -785,7 +785,7 @@ func TestSenderConnReaderError(t *testing.T) {
func TestSenderConnWriterError(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -841,7 +841,7 @@ func TestSenderFlowFrameWithEcho(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

Просмотреть файл

@ -44,7 +44,7 @@ func TestSessionClose(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
for i := 0; i < 4; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -77,7 +77,7 @@ func TestSessionServerClose(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -117,7 +117,7 @@ func TestSessionCloseTimeout(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -134,7 +134,7 @@ func TestSessionCloseTimeout(t *testing.T) {
func TestConnCloseSessionClose(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -155,7 +155,7 @@ func TestConnCloseSessionClose(t *testing.T) {
func TestSessionNewReceiverBadOptionFails(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -195,7 +195,7 @@ func TestSessionNewReceiverBatchingOneCredit(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -236,7 +236,7 @@ func TestSessionNewReceiverBatchingEnabled(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -275,7 +275,7 @@ func TestSessionNewReceiverMismatchedLinkName(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -297,7 +297,7 @@ func TestSessionNewReceiverMismatchedLinkName(t *testing.T) {
func TestSessionNewSenderBadOptionFails(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -335,7 +335,7 @@ func TestSessionNewSenderMismatchedLinkName(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -357,7 +357,7 @@ func TestSessionNewSenderMismatchedLinkName(t *testing.T) {
func TestSessionNewSenderDuplicateLinks(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -384,7 +384,7 @@ func TestSessionNewSenderDuplicateLinks(t *testing.T) {
func TestSessionNewSenderMaxHandles(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -411,7 +411,7 @@ func TestSessionNewSenderMaxHandles(t *testing.T) {
func TestSessionUnexpectedFrame(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -434,7 +434,7 @@ func TestSessionUnexpectedFrame(t *testing.T) {
func TestSessionInvalidFlowFrame(t *testing.T) {
netConn := mocks.NewNetConn(senderFrameHandlerNoUnhandled(ModeUnsettled))
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -492,7 +492,7 @@ func TestSessionFlowFrameWithEcho(t *testing.T) {
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -537,7 +537,7 @@ func TestSessionInvalidAttachDeadlock(t *testing.T) {
}
}
netConn := mocks.NewNetConn(responder)
client, err := New(netConn)
client, err := New(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)