2017-04-01 23:00:36 +03:00
|
|
|
package amqp
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
2022-11-18 01:44:40 +03:00
|
|
|
"context"
|
2017-04-23 04:32:50 +03:00
|
|
|
"crypto/tls"
|
2018-02-14 20:00:12 +03:00
|
|
|
"errors"
|
2021-09-02 23:42:44 +03:00
|
|
|
"fmt"
|
2017-04-30 02:38:15 +03:00
|
|
|
"math"
|
2017-04-01 23:00:36 +03:00
|
|
|
"net"
|
2021-09-20 20:55:31 +03:00
|
|
|
"net/url"
|
2017-04-27 06:35:29 +03:00
|
|
|
"sync"
|
2017-04-17 06:39:31 +03:00
|
|
|
"time"
|
2021-09-08 05:43:35 +03:00
|
|
|
|
2021-09-16 20:32:22 +03:00
|
|
|
"github.com/Azure/go-amqp/internal/bitmap"
|
2021-09-08 05:43:35 +03:00
|
|
|
"github.com/Azure/go-amqp/internal/buffer"
|
2022-10-26 01:33:27 +03:00
|
|
|
"github.com/Azure/go-amqp/internal/debug"
|
2021-09-09 20:24:27 +03:00
|
|
|
"github.com/Azure/go-amqp/internal/encoding"
|
2021-09-15 21:40:36 +03:00
|
|
|
"github.com/Azure/go-amqp/internal/frames"
|
2022-11-07 21:54:43 +03:00
|
|
|
"github.com/Azure/go-amqp/internal/shared"
|
2017-04-01 23:00:36 +03:00
|
|
|
)
|
|
|
|
|
2017-05-07 04:24:06 +03:00
|
|
|
// Default connection options
|
2017-04-01 23:00:36 +03:00
|
|
|
const (
|
2022-06-17 18:19:29 +03:00
|
|
|
defaultIdleTimeout = 1 * time.Minute
|
|
|
|
defaultMaxFrameSize = 65536
|
|
|
|
defaultMaxSessions = 65536
|
2017-04-30 02:38:15 +03:00
|
|
|
)
|
|
|
|
|
2022-06-25 00:30:04 +03:00
|
|
|
// ConnOptions contains the optional settings for configuring an AMQP connection.
|
|
|
|
type ConnOptions struct {
|
|
|
|
// ContainerID sets the container-id to use when opening the connection.
|
|
|
|
//
|
|
|
|
// A container ID will be randomly generated if this option is not used.
|
|
|
|
ContainerID string
|
|
|
|
|
|
|
|
// HostName sets the hostname sent in the AMQP
|
|
|
|
// Open frame and TLS ServerName (if not otherwise set).
|
|
|
|
HostName string
|
|
|
|
|
|
|
|
// IdleTimeout specifies the maximum period in milliseconds between
|
|
|
|
// receiving frames from the peer.
|
|
|
|
//
|
|
|
|
// Specify a value less than zero to disable idle timeout.
|
|
|
|
//
|
|
|
|
// Default: 1 minute.
|
|
|
|
IdleTimeout time.Duration
|
|
|
|
|
|
|
|
// MaxFrameSize sets the maximum frame size that
|
|
|
|
// the connection will accept.
|
|
|
|
//
|
|
|
|
// Must be 512 or greater.
|
|
|
|
//
|
|
|
|
// Default: 512.
|
|
|
|
MaxFrameSize uint32
|
|
|
|
|
|
|
|
// MaxSessions sets the maximum number of channels.
|
|
|
|
// The value must be greater than zero.
|
|
|
|
//
|
|
|
|
// Default: 65535.
|
|
|
|
MaxSessions uint16
|
|
|
|
|
|
|
|
// Properties sets an entry in the connection properties map sent to the server.
|
2022-11-17 04:53:58 +03:00
|
|
|
Properties map[string]any
|
2022-06-25 00:30:04 +03:00
|
|
|
|
|
|
|
// SASLType contains the specified SASL authentication mechanism.
|
|
|
|
SASLType SASLType
|
|
|
|
|
|
|
|
// Timeout configures how long to wait for the
|
|
|
|
// server during connection establishment.
|
|
|
|
//
|
|
|
|
// Once the connection has been established, IdleTimeout
|
|
|
|
// applies. If duration is zero, no timeout will be applied.
|
|
|
|
//
|
|
|
|
// Default: 0.
|
|
|
|
Timeout time.Duration
|
|
|
|
|
|
|
|
// TLSConfig sets the tls.Config to be used during
|
|
|
|
// TLS negotiation.
|
|
|
|
//
|
|
|
|
// This option is for advanced usage, in most scenarios
|
|
|
|
// providing a URL scheme of "amqps://" is sufficient.
|
|
|
|
TLSConfig *tls.Config
|
|
|
|
|
|
|
|
// test hook
|
|
|
|
dialer dialer
|
2018-08-01 05:50:14 +03:00
|
|
|
}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
// Dial connects to an AMQP server.
|
|
|
|
//
|
|
|
|
// If the addr includes a scheme, it must be "amqp", "amqps", or "amqp+ssl".
|
|
|
|
// If no port is provided, 5672 will be used for "amqp" and 5671 for "amqps" or "amqp+ssl".
|
|
|
|
//
|
|
|
|
// If username and password information is not empty it's used as SASL PLAIN
|
|
|
|
// credentials, equal to passing ConnSASLPlain option.
|
|
|
|
//
|
|
|
|
// opts: pass nil to accept the default values.
|
|
|
|
func Dial(addr string, opts *ConnOptions) (*Conn, error) {
|
|
|
|
c, err := dialConn(addr, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = c.start()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewConn establishes a new AMQP client connection over conn.
|
|
|
|
// opts: pass nil to accept the default values.
|
|
|
|
func NewConn(conn net.Conn, opts *ConnOptions) (*Conn, error) {
|
|
|
|
c, err := newConn(conn, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = c.start()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return c, nil
|
2021-10-06 20:22:56 +03:00
|
|
|
}
|
|
|
|
|
2022-11-22 06:38:58 +03:00
|
|
|
// Conn is an AMQP connection.
|
2022-11-18 01:44:40 +03:00
|
|
|
type Conn struct {
|
2017-05-04 05:30:30 +03:00
|
|
|
net net.Conn // underlying connection
|
|
|
|
connectTimeout time.Duration // time to wait for reads/writes during conn establishment
|
2021-10-06 20:22:56 +03:00
|
|
|
dialer dialer // used for testing purposes, it allows faking dialing TCP/TLS endpoints
|
2017-04-23 19:42:48 +03:00
|
|
|
|
|
|
|
// TLS
|
2017-04-30 02:38:15 +03:00
|
|
|
tlsNegotiation bool // negotiate TLS
|
|
|
|
tlsComplete bool // TLS negotiation complete
|
2017-05-04 05:30:30 +03:00
|
|
|
tlsConfig *tls.Config // TLS config, default used if nil (ServerName set to Client.hostname)
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2017-04-30 02:38:15 +03:00
|
|
|
// SASL
|
2021-09-09 20:24:27 +03:00
|
|
|
saslHandlers map[encoding.Symbol]stateFunc // map of supported handlers keyed by SASL mechanism, SASL not negotiated if nil
|
2021-09-20 20:55:31 +03:00
|
|
|
saslComplete bool // SASL negotiation complete; internal *except* for SASL auth methods
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2017-04-30 02:38:15 +03:00
|
|
|
// local settings
|
2022-11-17 04:53:58 +03:00
|
|
|
maxFrameSize uint32 // max frame size to accept
|
|
|
|
channelMax uint16 // maximum number of channels to allow
|
|
|
|
hostname string // hostname of remote server (set explicitly or parsed from URL)
|
|
|
|
idleTimeout time.Duration // maximum period between receiving frames
|
|
|
|
properties map[encoding.Symbol]any // additional properties sent upon connection open
|
|
|
|
containerID string // set explicitly or randomly generated
|
2017-04-27 06:35:29 +03:00
|
|
|
|
2017-04-30 02:38:15 +03:00
|
|
|
// peer settings
|
|
|
|
peerIdleTimeout time.Duration // maximum period between sending frames
|
2022-11-18 01:44:40 +03:00
|
|
|
peerMaxFrameSize uint32 // maximum frame size peer will accept
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2017-04-30 02:38:15 +03:00
|
|
|
// conn state
|
2022-12-14 01:00:03 +03:00
|
|
|
done chan struct{} // indicates the connection has terminated
|
|
|
|
doneErr error // contains the error state returned from Close(); DO NOT TOUCH outside of conn.go until Done has been closed!
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// connReader and connWriter management
|
|
|
|
rxtxExit chan struct{} // signals connReader and connWriter to exit
|
|
|
|
closeOnce sync.Once // ensures that close() is only called once
|
2017-05-04 09:12:18 +03:00
|
|
|
|
2022-08-10 00:59:07 +03:00
|
|
|
// session tracking
|
|
|
|
channels *bitmap.Bitmap
|
|
|
|
sessionsByChannel map[uint16]*Session
|
|
|
|
sessionsByChannelMu sync.RWMutex
|
|
|
|
|
2017-05-04 09:12:18 +03:00
|
|
|
// connReader
|
2022-12-14 01:00:03 +03:00
|
|
|
rxBuf buffer.Buffer // incoming bytes buffer
|
|
|
|
rxDone chan struct{} // closed when connReader exits
|
|
|
|
rxErr error // contains last error reading from c.net; DO NOT TOUCH outside of connReader until rxDone has been closed!
|
2017-05-04 09:12:18 +03:00
|
|
|
|
|
|
|
// connWriter
|
2021-09-15 21:40:36 +03:00
|
|
|
txFrame chan frames.Frame // AMQP frames to be sent by connWriter
|
|
|
|
txBuf buffer.Buffer // buffer for marshaling frames before transmitting
|
2022-12-14 01:00:03 +03:00
|
|
|
txDone chan struct{} // closed when connWriter exits
|
|
|
|
txErr error // contains last error writing to c.net; DO NOT TOUCH outside of connWriter until txDone has been closed!
|
2017-05-04 09:12:18 +03:00
|
|
|
}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
// used to abstract the underlying dialer for testing purposes
|
|
|
|
type dialer interface {
|
|
|
|
NetDialerDial(c *Conn, host, port string) error
|
|
|
|
TLSDialWithDialer(c *Conn, host, port string) error
|
|
|
|
}
|
|
|
|
|
2021-10-06 20:22:56 +03:00
|
|
|
// implements the dialer interface
|
|
|
|
type defaultDialer struct{}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
func (defaultDialer) NetDialerDial(c *Conn, host, port string) (err error) {
|
2021-10-06 20:22:56 +03:00
|
|
|
dialer := &net.Dialer{Timeout: c.connectTimeout}
|
|
|
|
c.net, err = dialer.Dial("tcp", net.JoinHostPort(host, port))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
func (defaultDialer) TLSDialWithDialer(c *Conn, host, port string) (err error) {
|
2021-10-06 20:22:56 +03:00
|
|
|
dialer := &net.Dialer{Timeout: c.connectTimeout}
|
|
|
|
c.net, err = tls.DialWithDialer(dialer, "tcp", net.JoinHostPort(host, port), c.tlsConfig)
|
|
|
|
return
|
|
|
|
}
|
2021-10-06 00:34:54 +03:00
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
func dialConn(addr string, opts *ConnOptions) (*Conn, error) {
|
2021-09-20 20:55:31 +03:00
|
|
|
u, err := url.Parse(addr)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
host, port := u.Hostname(), u.Port()
|
|
|
|
if port == "" {
|
|
|
|
port = "5672"
|
2022-02-09 01:49:35 +03:00
|
|
|
if u.Scheme == "amqps" || u.Scheme == "amqp+ssl" {
|
2021-09-20 20:55:31 +03:00
|
|
|
port = "5671"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-25 00:30:04 +03:00
|
|
|
var cp ConnOptions
|
|
|
|
if opts != nil {
|
|
|
|
cp = *opts
|
|
|
|
}
|
|
|
|
|
2021-09-20 20:55:31 +03:00
|
|
|
// prepend SASL credentials when the user/pass segment is not empty
|
|
|
|
if u.User != nil {
|
|
|
|
pass, _ := u.User.Password()
|
2022-06-25 00:30:04 +03:00
|
|
|
cp.SASLType = SASLTypePlain(u.User.Username(), pass)
|
2021-09-20 20:55:31 +03:00
|
|
|
}
|
|
|
|
|
2022-06-25 00:30:04 +03:00
|
|
|
if cp.HostName == "" {
|
|
|
|
cp.HostName = host
|
|
|
|
}
|
2021-09-20 20:55:31 +03:00
|
|
|
|
2022-06-25 00:30:04 +03:00
|
|
|
c, err := newConn(nil, &cp)
|
2021-09-20 20:55:31 +03:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
switch u.Scheme {
|
|
|
|
case "amqp", "":
|
2021-10-06 20:22:56 +03:00
|
|
|
err = c.dialer.NetDialerDial(c, host, port)
|
2022-02-09 01:49:35 +03:00
|
|
|
case "amqps", "amqp+ssl":
|
2021-09-20 20:55:31 +03:00
|
|
|
c.initTLSConfig()
|
|
|
|
c.tlsNegotiation = false
|
2021-10-06 20:22:56 +03:00
|
|
|
err = c.dialer.TLSDialWithDialer(c, host, port)
|
2021-09-20 20:55:31 +03:00
|
|
|
default:
|
2021-10-06 20:22:56 +03:00
|
|
|
err = fmt.Errorf("unsupported scheme %q", u.Scheme)
|
2021-09-20 20:55:31 +03:00
|
|
|
}
|
2021-10-06 20:22:56 +03:00
|
|
|
|
2021-09-20 20:55:31 +03:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
func newConn(netConn net.Conn, opts *ConnOptions) (*Conn, error) {
|
|
|
|
c := &Conn{
|
2022-08-10 00:59:07 +03:00
|
|
|
dialer: defaultDialer{},
|
|
|
|
net: netConn,
|
|
|
|
maxFrameSize: defaultMaxFrameSize,
|
2022-11-18 01:44:40 +03:00
|
|
|
peerMaxFrameSize: defaultMaxFrameSize,
|
2022-08-10 00:59:07 +03:00
|
|
|
channelMax: defaultMaxSessions - 1, // -1 because channel-max starts at zero
|
|
|
|
idleTimeout: defaultIdleTimeout,
|
2022-11-07 21:54:43 +03:00
|
|
|
containerID: shared.RandString(40),
|
2022-11-18 01:44:40 +03:00
|
|
|
done: make(chan struct{}),
|
2022-12-14 01:00:03 +03:00
|
|
|
rxtxExit: make(chan struct{}),
|
2022-08-10 00:59:07 +03:00
|
|
|
rxDone: make(chan struct{}),
|
|
|
|
txFrame: make(chan frames.Frame),
|
|
|
|
txDone: make(chan struct{}),
|
|
|
|
sessionsByChannel: map[uint16]*Session{},
|
2017-05-04 09:12:18 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// apply options
|
2022-06-25 00:30:04 +03:00
|
|
|
if opts == nil {
|
|
|
|
opts = &ConnOptions{}
|
|
|
|
}
|
|
|
|
|
|
|
|
if opts.ContainerID != "" {
|
|
|
|
c.containerID = opts.ContainerID
|
|
|
|
}
|
|
|
|
if opts.HostName != "" {
|
|
|
|
c.hostname = opts.HostName
|
|
|
|
}
|
|
|
|
if opts.IdleTimeout > 0 {
|
|
|
|
c.idleTimeout = opts.IdleTimeout
|
|
|
|
} else if opts.IdleTimeout < 0 {
|
|
|
|
c.idleTimeout = 0
|
|
|
|
}
|
|
|
|
if opts.MaxFrameSize > 0 && opts.MaxFrameSize < 512 {
|
|
|
|
return nil, fmt.Errorf("invalid MaxFrameSize value %d", opts.MaxFrameSize)
|
|
|
|
} else if opts.MaxFrameSize > 512 {
|
|
|
|
c.maxFrameSize = opts.MaxFrameSize
|
|
|
|
}
|
|
|
|
if opts.MaxSessions > 0 {
|
|
|
|
c.channelMax = opts.MaxSessions
|
|
|
|
}
|
|
|
|
if opts.SASLType != nil {
|
|
|
|
if err := opts.SASLType(c); err != nil {
|
2017-05-04 09:12:18 +03:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
2022-06-25 00:30:04 +03:00
|
|
|
if opts.Timeout > 0 {
|
|
|
|
c.connectTimeout = opts.Timeout
|
|
|
|
}
|
|
|
|
if opts.Properties != nil {
|
2022-11-17 04:53:58 +03:00
|
|
|
c.properties = make(map[encoding.Symbol]any)
|
2022-06-25 00:30:04 +03:00
|
|
|
for key, val := range opts.Properties {
|
|
|
|
c.properties[encoding.Symbol(key)] = val
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if opts.TLSConfig != nil {
|
|
|
|
c.tlsConfig = opts.TLSConfig.Clone()
|
|
|
|
}
|
|
|
|
if opts.dialer != nil {
|
|
|
|
c.dialer = opts.dialer
|
|
|
|
}
|
2018-01-28 19:13:49 +03:00
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) initTLSConfig() {
|
2018-01-28 19:13:49 +03:00
|
|
|
// create a new config if not already set
|
|
|
|
if c.tlsConfig == nil {
|
|
|
|
c.tlsConfig = new(tls.Config)
|
|
|
|
}
|
2017-05-04 09:12:18 +03:00
|
|
|
|
2018-01-28 19:13:49 +03:00
|
|
|
// TLS config must have ServerName or InsecureSkipVerify set
|
|
|
|
if c.tlsConfig.ServerName == "" && !c.tlsConfig.InsecureSkipVerify {
|
|
|
|
c.tlsConfig.ServerName = c.hostname
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// start establishes the connection and begins multiplexing network IO.
|
2021-10-06 00:34:54 +03:00
|
|
|
// It is an error to call Start() on a connection that's been closed.
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) start() error {
|
2017-05-04 09:12:18 +03:00
|
|
|
// run connection establishment state machine
|
|
|
|
for state := c.negotiateProto; state != nil; {
|
2022-07-05 17:12:59 +03:00
|
|
|
var err error
|
|
|
|
state, err = state()
|
|
|
|
// check if err occurred
|
|
|
|
if err != nil {
|
|
|
|
close(c.txDone) // close here since connWriter hasn't been started yet
|
2022-12-14 01:00:03 +03:00
|
|
|
close(c.rxDone)
|
2022-07-05 17:12:59 +03:00
|
|
|
_ = c.Close()
|
|
|
|
return err
|
|
|
|
}
|
2017-05-04 09:12:18 +03:00
|
|
|
}
|
|
|
|
|
2022-08-10 00:59:07 +03:00
|
|
|
// we can't create the channel bitmap until the connection has been established.
|
|
|
|
// this is because our peer can tell us the max channels they support.
|
|
|
|
c.channels = bitmap.New(uint32(c.channelMax))
|
|
|
|
|
2017-05-04 09:56:55 +03:00
|
|
|
go c.connWriter()
|
2022-12-14 01:00:03 +03:00
|
|
|
go c.connReader()
|
2017-05-04 09:12:18 +03:00
|
|
|
|
2018-01-28 19:13:49 +03:00
|
|
|
return nil
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
|
2021-09-20 20:55:31 +03:00
|
|
|
// Close closes the connection.
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) Close() error {
|
2022-12-14 01:00:03 +03:00
|
|
|
c.close()
|
2022-11-22 06:38:58 +03:00
|
|
|
var connErr *ConnError
|
2022-12-14 01:00:03 +03:00
|
|
|
if errors.As(c.doneErr, &connErr) && connErr.RemoteErr == nil && connErr.inner == nil {
|
2022-03-04 23:28:45 +03:00
|
|
|
// an empty ConnectionError means the connection was closed by the caller
|
2018-02-14 20:00:12 +03:00
|
|
|
return nil
|
|
|
|
}
|
2022-12-14 01:00:03 +03:00
|
|
|
|
|
|
|
// there was an error during shut-down or connReader/connWriter
|
|
|
|
// experienced a terminal error
|
|
|
|
return c.doneErr
|
2017-11-13 07:51:38 +03:00
|
|
|
}
|
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// close is called once, either from Close() or when connReader/connWriter exits
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) close() {
|
2022-12-14 01:00:03 +03:00
|
|
|
c.closeOnce.Do(func() {
|
|
|
|
defer close(c.done)
|
2017-11-13 07:51:38 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
close(c.rxtxExit)
|
2017-11-13 07:51:38 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// wait for writing to stop, allows it to send the final close frame
|
|
|
|
<-c.txDone
|
2021-10-27 22:02:16 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
closeErr := c.net.Close()
|
2018-02-14 20:00:12 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// check rxDone after closing net, otherwise may block
|
|
|
|
// for up to c.idleTimeout
|
|
|
|
<-c.rxDone
|
2018-02-14 20:00:12 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
if errors.Is(c.rxErr, net.ErrClosed) {
|
|
|
|
// this is the expected error when the connection is closed, swallow it
|
|
|
|
c.rxErr = nil
|
|
|
|
}
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
if c.txErr == nil && c.rxErr == nil && closeErr == nil {
|
|
|
|
// if there are no errors, it means user initiated close() and we shut down cleanly
|
|
|
|
c.doneErr = &ConnError{}
|
|
|
|
} else if amqpErr, ok := c.rxErr.(*Error); ok {
|
|
|
|
// we experienced a peer-initiated close that contained an Error. return it
|
|
|
|
c.doneErr = &ConnError{RemoteErr: amqpErr}
|
|
|
|
} else if c.txErr != nil {
|
|
|
|
c.doneErr = &ConnError{inner: c.txErr}
|
|
|
|
} else if c.rxErr != nil {
|
|
|
|
c.doneErr = &ConnError{inner: c.rxErr}
|
|
|
|
} else {
|
|
|
|
c.doneErr = &ConnError{inner: closeErr}
|
|
|
|
}
|
|
|
|
})
|
2022-11-18 01:44:40 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Conn) NewSession(ctx context.Context, opts *SessionOptions) (*Session, error) {
|
|
|
|
session, err := c.newSession(opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := session.begin(ctx); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return session, nil
|
2017-11-13 07:51:38 +03:00
|
|
|
}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) newSession(opts *SessionOptions) (*Session, error) {
|
2022-08-10 00:59:07 +03:00
|
|
|
c.sessionsByChannelMu.Lock()
|
|
|
|
defer c.sessionsByChannelMu.Unlock()
|
|
|
|
|
|
|
|
// create the next session to allocate
|
|
|
|
// note that channel always start at 0
|
|
|
|
channel, ok := c.channels.Next()
|
|
|
|
if !ok {
|
|
|
|
return nil, fmt.Errorf("reached connection channel max (%d)", c.channelMax)
|
|
|
|
}
|
2022-11-12 03:11:32 +03:00
|
|
|
session := newSession(c, uint16(channel), opts)
|
2022-08-10 00:59:07 +03:00
|
|
|
c.sessionsByChannel[session.channel] = session
|
2022-11-18 01:44:40 +03:00
|
|
|
|
2022-08-10 00:59:07 +03:00
|
|
|
return session, nil
|
|
|
|
}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) deleteSession(s *Session) {
|
2022-08-10 00:59:07 +03:00
|
|
|
c.sessionsByChannelMu.Lock()
|
|
|
|
defer c.sessionsByChannelMu.Unlock()
|
|
|
|
|
|
|
|
delete(c.sessionsByChannel, s.channel)
|
|
|
|
c.channels.Remove(uint32(s.channel))
|
|
|
|
}
|
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// connReader reads from the net.Conn, decodes frames, and either handles
|
|
|
|
// them here as appropriate or sends them to the session.rx channel.
|
|
|
|
func (c *Conn) connReader() {
|
|
|
|
defer func() {
|
|
|
|
close(c.rxDone)
|
|
|
|
c.close()
|
|
|
|
}()
|
2017-04-27 06:35:29 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
var sessionsByRemoteChannel = make(map[uint16]*Session)
|
|
|
|
var err error
|
2017-04-01 23:00:36 +03:00
|
|
|
for {
|
2022-12-14 01:00:03 +03:00
|
|
|
if err != nil {
|
|
|
|
debug.Log(1, "connReader terminal error: %v", err)
|
|
|
|
c.rxErr = err
|
2017-04-24 06:24:12 +03:00
|
|
|
return
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
var fr frames.Frame
|
|
|
|
fr, err = c.readFrame()
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
2019-01-30 06:02:08 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
var (
|
|
|
|
session *Session
|
|
|
|
ok bool
|
|
|
|
)
|
|
|
|
|
|
|
|
switch body := fr.Body.(type) {
|
|
|
|
// Server initiated close.
|
|
|
|
case *frames.PerformClose:
|
|
|
|
// connWriter will send the close performative ack on its way out.
|
|
|
|
// it's a SHOULD though, not a MUST.
|
|
|
|
debug.Log(3, "RX (connReader): %s", body)
|
|
|
|
if body.Error == nil {
|
|
|
|
return
|
2018-02-08 08:26:49 +03:00
|
|
|
}
|
2022-12-14 01:00:03 +03:00
|
|
|
err = body.Error
|
|
|
|
continue
|
2018-02-08 08:26:49 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// RemoteChannel should be used when frame is Begin
|
|
|
|
case *frames.PerformBegin:
|
|
|
|
if body.RemoteChannel == nil {
|
|
|
|
// since we only support remotely-initiated sessions, this is an error
|
|
|
|
// TODO: it would be ideal to not have this kill the connection
|
|
|
|
err = fmt.Errorf("%T: nil RemoteChannel", fr.Body)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
c.sessionsByChannelMu.RLock()
|
|
|
|
session, ok = c.sessionsByChannel[*body.RemoteChannel]
|
|
|
|
c.sessionsByChannelMu.RUnlock()
|
2018-02-08 08:26:49 +03:00
|
|
|
if !ok {
|
2022-12-14 01:00:03 +03:00
|
|
|
err = fmt.Errorf("unexpected remote channel number %d", *body.RemoteChannel)
|
2018-02-08 08:26:49 +03:00
|
|
|
continue
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
2017-12-31 08:22:02 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
session.remoteChannel = fr.Channel
|
|
|
|
sessionsByRemoteChannel[fr.Channel] = session
|
|
|
|
|
|
|
|
case *frames.PerformEnd:
|
|
|
|
session, ok = sessionsByRemoteChannel[fr.Channel]
|
|
|
|
if !ok {
|
|
|
|
err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel (PerformEnd)", fr.Body, fr.Channel)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// we MUST remove the remote channel from our map as soon as we receive
|
|
|
|
// the ack (i.e. before passing it on to the session mux) on the session
|
|
|
|
// ending since the numbers are recycled.
|
|
|
|
delete(sessionsByRemoteChannel, fr.Channel)
|
|
|
|
|
|
|
|
default:
|
|
|
|
// pass on performative to the correct session
|
|
|
|
session, ok = sessionsByRemoteChannel[fr.Channel]
|
|
|
|
if !ok {
|
|
|
|
err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel", fr.Body, fr.Channel)
|
|
|
|
continue
|
2017-12-31 08:22:02 +03:00
|
|
|
}
|
2022-12-14 01:00:03 +03:00
|
|
|
}
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
select {
|
|
|
|
case session.rx <- fr:
|
|
|
|
case <-c.rxtxExit:
|
2017-04-27 06:35:29 +03:00
|
|
|
return
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// readFrame reads a complete frame from c.net.
|
|
|
|
// it assumes that any read deadline has already been applied.
|
|
|
|
// used externally by SASL only.
|
|
|
|
func (c *Conn) readFrame() (frames.Frame, error) {
|
|
|
|
switch {
|
|
|
|
// Cheaply reuse free buffer space when fully read.
|
|
|
|
case c.rxBuf.Len() == 0:
|
|
|
|
c.rxBuf.Reset()
|
2017-11-13 07:51:38 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// Prevent excessive/unbounded growth by shifting data to beginning of buffer.
|
|
|
|
case int64(c.rxBuf.Size()) > int64(c.maxFrameSize):
|
|
|
|
c.rxBuf.Reclaim()
|
|
|
|
}
|
2017-04-24 06:24:12 +03:00
|
|
|
|
2017-04-30 02:38:15 +03:00
|
|
|
var (
|
2021-10-05 02:56:20 +03:00
|
|
|
currentHeader frames.Header // keep track of the current header, for frames split across multiple TCP packets
|
|
|
|
frameInProgress bool // true if in the middle of receiving data for currentHeader
|
2017-04-30 02:38:15 +03:00
|
|
|
)
|
2017-04-23 04:31:07 +03:00
|
|
|
|
2017-04-27 06:35:29 +03:00
|
|
|
for {
|
2017-05-07 05:10:33 +03:00
|
|
|
// need to read more if buf doesn't contain the complete frame
|
2017-05-01 08:02:53 +03:00
|
|
|
// or there's not enough in buf to parse the header
|
2022-12-14 01:00:03 +03:00
|
|
|
if frameInProgress || c.rxBuf.Len() < frames.HeaderSize {
|
|
|
|
// we MUST reset the idle timeout before each read from net.Conn
|
2018-06-01 06:34:23 +03:00
|
|
|
if c.idleTimeout > 0 {
|
|
|
|
_ = c.net.SetReadDeadline(time.Now().Add(c.idleTimeout))
|
|
|
|
}
|
2022-12-14 01:00:03 +03:00
|
|
|
err := c.rxBuf.ReadFromOnce(c.net)
|
2017-04-27 06:35:29 +03:00
|
|
|
if err != nil {
|
2022-12-14 01:00:03 +03:00
|
|
|
debug.Log(1, "readFrame error: %v", err)
|
|
|
|
return frames.Frame{}, err
|
2017-04-24 06:24:12 +03:00
|
|
|
}
|
2017-04-27 06:35:29 +03:00
|
|
|
}
|
2017-04-23 04:31:07 +03:00
|
|
|
|
2017-05-07 05:10:33 +03:00
|
|
|
// read more if buf doesn't contain enough to parse the header
|
2022-12-14 01:00:03 +03:00
|
|
|
if c.rxBuf.Len() < frames.HeaderSize {
|
2017-04-27 06:35:29 +03:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-05-07 05:10:33 +03:00
|
|
|
// parse the header if a frame isn't in progress
|
2017-04-27 06:35:29 +03:00
|
|
|
if !frameInProgress {
|
2017-05-01 08:02:53 +03:00
|
|
|
var err error
|
2022-12-14 01:00:03 +03:00
|
|
|
currentHeader, err = frames.ParseHeader(&c.rxBuf)
|
2017-04-27 06:35:29 +03:00
|
|
|
if err != nil {
|
2022-12-14 01:00:03 +03:00
|
|
|
return frames.Frame{}, err
|
2017-04-27 06:35:29 +03:00
|
|
|
}
|
|
|
|
frameInProgress = true
|
|
|
|
}
|
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// check size is reasonable
|
2017-04-30 02:38:15 +03:00
|
|
|
if currentHeader.Size > math.MaxInt32 { // make max size configurable
|
2022-12-14 01:00:03 +03:00
|
|
|
return frames.Frame{}, errors.New("payload too large")
|
2017-04-30 02:38:15 +03:00
|
|
|
}
|
|
|
|
|
2021-10-05 02:56:20 +03:00
|
|
|
bodySize := int64(currentHeader.Size - frames.HeaderSize)
|
2017-04-30 02:38:15 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// the full frame hasn't been received, keep reading
|
|
|
|
if int64(c.rxBuf.Len()) < bodySize {
|
2017-04-27 06:35:29 +03:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
frameInProgress = false
|
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// check if body is empty (keepalive)
|
2017-04-30 02:38:15 +03:00
|
|
|
if bodySize == 0 {
|
2022-12-14 01:00:03 +03:00
|
|
|
debug.Log(3, "received keep-alive frame")
|
2017-05-01 08:02:53 +03:00
|
|
|
continue
|
2017-04-30 02:38:15 +03:00
|
|
|
}
|
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// parse the frame
|
2022-12-14 01:00:03 +03:00
|
|
|
b, ok := c.rxBuf.Next(bodySize)
|
2018-02-03 22:54:49 +03:00
|
|
|
if !ok {
|
2022-12-14 01:00:03 +03:00
|
|
|
return frames.Frame{}, fmt.Errorf("buffer EOF; requested bytes: %d, actual size: %d", bodySize, c.rxBuf.Len())
|
2018-02-03 22:54:49 +03:00
|
|
|
}
|
|
|
|
|
2021-10-05 02:56:20 +03:00
|
|
|
parsedBody, err := frames.ParseBody(buffer.New(b))
|
2017-04-27 06:35:29 +03:00
|
|
|
if err != nil {
|
2022-12-14 01:00:03 +03:00
|
|
|
return frames.Frame{}, err
|
2017-04-27 06:35:29 +03:00
|
|
|
}
|
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
return frames.Frame{Channel: currentHeader.Channel, Body: parsedBody}, nil
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) connWriter() {
|
2022-12-14 01:00:03 +03:00
|
|
|
defer func() {
|
|
|
|
close(c.txDone)
|
|
|
|
c.close()
|
|
|
|
}()
|
2017-11-13 07:51:38 +03:00
|
|
|
|
2019-06-20 17:50:15 +03:00
|
|
|
// disable write timeout
|
|
|
|
if c.connectTimeout != 0 {
|
|
|
|
c.connectTimeout = 0
|
|
|
|
_ = c.net.SetWriteDeadline(time.Time{})
|
|
|
|
}
|
|
|
|
|
2017-05-07 01:26:17 +03:00
|
|
|
var (
|
|
|
|
// keepalives are sent at a rate of 1/2 idle timeout
|
|
|
|
keepaliveInterval = c.peerIdleTimeout / 2
|
|
|
|
// 0 disables keepalives
|
|
|
|
keepalivesEnabled = keepaliveInterval > 0
|
|
|
|
// set if enable, nil if not; nil channels block forever
|
|
|
|
keepalive <-chan time.Time
|
|
|
|
)
|
2017-05-04 09:12:18 +03:00
|
|
|
|
2017-05-07 01:26:17 +03:00
|
|
|
if keepalivesEnabled {
|
|
|
|
ticker := time.NewTicker(keepaliveInterval)
|
2017-05-04 09:12:18 +03:00
|
|
|
defer ticker.Stop()
|
|
|
|
keepalive = ticker.C
|
|
|
|
}
|
|
|
|
|
|
|
|
var err error
|
|
|
|
for {
|
|
|
|
if err != nil {
|
2022-12-14 01:00:03 +03:00
|
|
|
debug.Log(1, "connWriter terminal error: %v", err)
|
|
|
|
c.txErr = err
|
2017-05-04 09:12:18 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
// frame write request
|
|
|
|
case fr := <-c.txFrame:
|
2017-05-04 09:56:55 +03:00
|
|
|
err = c.writeFrame(fr)
|
2021-09-15 21:40:36 +03:00
|
|
|
if err == nil && fr.Done != nil {
|
|
|
|
close(fr.Done)
|
2017-12-31 08:22:02 +03:00
|
|
|
}
|
2017-05-04 09:12:18 +03:00
|
|
|
|
|
|
|
// keepalive timer
|
2017-05-07 01:26:17 +03:00
|
|
|
case <-keepalive:
|
2022-10-26 01:33:27 +03:00
|
|
|
debug.Log(3, "sending keep-alive frame")
|
2017-05-04 09:12:18 +03:00
|
|
|
_, err = c.net.Write(keepaliveFrame)
|
2017-05-07 01:26:17 +03:00
|
|
|
// It would be slightly more efficient in terms of network
|
|
|
|
// resources to reset the timer each time a frame is sent.
|
|
|
|
// However, keepalives are small (8 bytes) and the interval
|
|
|
|
// is usually on the order of minutes. It does not seem
|
|
|
|
// worth it to add extra operations in the write path to
|
|
|
|
// avoid. (To properly reset a timer it needs to be stopped,
|
|
|
|
// possibly drained, then reset.)
|
|
|
|
|
|
|
|
// connection complete
|
2022-12-14 01:00:03 +03:00
|
|
|
case <-c.rxtxExit:
|
|
|
|
// send close performative. note that the spec says we
|
|
|
|
// SHOULD wait for the ack but we don't HAVE to, in order
|
|
|
|
// to be resilient to bad actors etc. so we just send
|
|
|
|
// the close performative and exit.
|
2021-09-15 21:40:36 +03:00
|
|
|
cls := &frames.PerformClose{}
|
2022-10-26 01:33:27 +03:00
|
|
|
debug.Log(1, "TX (connWriter): %s", cls)
|
2022-12-14 01:00:03 +03:00
|
|
|
c.txErr = c.writeFrame(frames.Frame{
|
2022-11-07 21:54:43 +03:00
|
|
|
Type: frames.TypeAMQP,
|
2021-09-15 21:40:36 +03:00
|
|
|
Body: cls,
|
2017-11-13 07:51:38 +03:00
|
|
|
})
|
2017-05-04 09:56:55 +03:00
|
|
|
return
|
2017-05-04 09:12:18 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-20 20:55:31 +03:00
|
|
|
// writeFrame writes a frame to the network.
|
|
|
|
// used externally by SASL only.
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) writeFrame(fr frames.Frame) error {
|
2017-05-04 09:56:55 +03:00
|
|
|
if c.connectTimeout != 0 {
|
2018-02-17 19:24:03 +03:00
|
|
|
_ = c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
|
2017-05-04 09:56:55 +03:00
|
|
|
}
|
2017-05-07 02:57:27 +03:00
|
|
|
|
|
|
|
// writeFrame into txBuf
|
2021-09-08 05:43:35 +03:00
|
|
|
c.txBuf.Reset()
|
2022-11-07 23:55:24 +03:00
|
|
|
err := frames.Write(&c.txBuf, fr)
|
2017-05-04 09:56:55 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-05-07 05:10:33 +03:00
|
|
|
// validate the frame isn't exceeding peer's max frame size
|
2021-09-08 05:43:35 +03:00
|
|
|
requiredFrameSize := c.txBuf.Len()
|
2022-11-18 01:44:40 +03:00
|
|
|
if uint64(requiredFrameSize) > uint64(c.peerMaxFrameSize) {
|
|
|
|
return fmt.Errorf("%T frame size %d larger than peer's max frame size %d", fr, requiredFrameSize, c.peerMaxFrameSize)
|
2017-05-04 09:56:55 +03:00
|
|
|
}
|
|
|
|
|
2017-05-07 02:57:27 +03:00
|
|
|
// write to network
|
2022-11-22 06:38:58 +03:00
|
|
|
n, err := c.net.Write(c.txBuf.Bytes())
|
|
|
|
if l := c.txBuf.Len(); n > 0 && n < l && err != nil {
|
|
|
|
debug.Log(1, "wrote %d bytes less than len %d: %v", n, l, err)
|
|
|
|
}
|
2017-05-04 09:56:55 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-05-07 02:57:27 +03:00
|
|
|
// writeProtoHeader writes an AMQP protocol header to the
|
|
|
|
// network
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) writeProtoHeader(pID protoID) error {
|
2017-05-04 09:56:55 +03:00
|
|
|
if c.connectTimeout != 0 {
|
2018-02-17 19:24:03 +03:00
|
|
|
_ = c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
|
2017-05-04 09:56:55 +03:00
|
|
|
}
|
|
|
|
_, err := c.net.Write([]byte{'A', 'M', 'Q', 'P', byte(pID), 1, 0, 0})
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-05-04 09:12:18 +03:00
|
|
|
// keepaliveFrame is an AMQP frame with no body, used for keepalives
|
|
|
|
var keepaliveFrame = []byte{0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}
|
|
|
|
|
2021-09-20 20:55:31 +03:00
|
|
|
// SendFrame is used by sessions and links to send frames across the network.
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) sendFrame(fr frames.Frame) error {
|
2017-05-04 09:12:18 +03:00
|
|
|
select {
|
|
|
|
case c.txFrame <- fr:
|
2018-05-21 06:32:31 +03:00
|
|
|
return nil
|
2022-11-18 01:44:40 +03:00
|
|
|
case <-c.done:
|
2022-12-14 01:00:03 +03:00
|
|
|
return c.doneErr
|
2017-05-04 09:12:18 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-04 05:30:30 +03:00
|
|
|
// stateFunc is a state in a state machine.
|
|
|
|
//
|
|
|
|
// The state is advanced by returning the next state.
|
|
|
|
// The state machine concludes when nil is returned.
|
2022-07-05 17:12:59 +03:00
|
|
|
type stateFunc func() (stateFunc, error)
|
2017-05-04 05:30:30 +03:00
|
|
|
|
2021-09-20 20:55:31 +03:00
|
|
|
// negotiateProto determines which proto to negotiate next.
|
|
|
|
// used externally by SASL only.
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) negotiateProto() (stateFunc, error) {
|
2017-05-01 08:02:53 +03:00
|
|
|
// in the order each must be negotiated
|
2017-04-01 23:00:36 +03:00
|
|
|
switch {
|
2017-04-23 19:42:48 +03:00
|
|
|
case c.tlsNegotiation && !c.tlsComplete:
|
2017-04-23 21:01:44 +03:00
|
|
|
return c.exchangeProtoHeader(protoTLS)
|
2017-04-01 23:00:36 +03:00
|
|
|
case c.saslHandlers != nil && !c.saslComplete:
|
2017-04-23 21:01:44 +03:00
|
|
|
return c.exchangeProtoHeader(protoSASL)
|
2017-04-01 23:00:36 +03:00
|
|
|
default:
|
2017-04-23 21:01:44 +03:00
|
|
|
return c.exchangeProtoHeader(protoAMQP)
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-04 09:12:18 +03:00
|
|
|
type protoID uint8
|
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// protocol IDs received in protoHeaders
|
2017-04-01 23:00:36 +03:00
|
|
|
const (
|
2017-05-04 09:12:18 +03:00
|
|
|
protoAMQP protoID = 0x0
|
|
|
|
protoTLS protoID = 0x2
|
|
|
|
protoSASL protoID = 0x3
|
2017-04-01 23:00:36 +03:00
|
|
|
)
|
|
|
|
|
2017-04-30 02:38:15 +03:00
|
|
|
// exchangeProtoHeader performs the round trip exchange of protocol
|
|
|
|
// headers, validation, and returns the protoID specific next state.
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) exchangeProtoHeader(pID protoID) (stateFunc, error) {
|
2017-05-01 08:02:53 +03:00
|
|
|
// write the proto header
|
2022-07-05 17:12:59 +03:00
|
|
|
if err := c.writeProtoHeader(pID); err != nil {
|
|
|
|
return nil, err
|
2017-05-04 09:56:55 +03:00
|
|
|
}
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// read response header
|
2017-05-07 01:26:17 +03:00
|
|
|
p, err := c.readProtoHeader()
|
|
|
|
if err != nil {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, err
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
|
2017-05-04 09:12:18 +03:00
|
|
|
if pID != p.ProtoID {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, fmt.Errorf("unexpected protocol header %#00x, expected %#00x", p.ProtoID, pID)
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// go to the proto specific state
|
2017-05-04 09:12:18 +03:00
|
|
|
switch pID {
|
2017-04-23 21:01:44 +03:00
|
|
|
case protoAMQP:
|
2022-07-05 17:12:59 +03:00
|
|
|
return c.openAMQP, nil
|
2017-04-23 21:01:44 +03:00
|
|
|
case protoTLS:
|
2022-07-05 17:12:59 +03:00
|
|
|
return c.startTLS, nil
|
2017-04-23 21:01:44 +03:00
|
|
|
case protoSASL:
|
2022-07-05 17:12:59 +03:00
|
|
|
return c.negotiateSASL, nil
|
2017-04-01 23:00:36 +03:00
|
|
|
default:
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, fmt.Errorf("unknown protocol ID %#02x", p.ProtoID)
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-07 02:57:27 +03:00
|
|
|
// readProtoHeader reads a protocol header packet from c.rxProto.
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) readProtoHeader() (protoHeader, error) {
|
2022-12-14 01:00:03 +03:00
|
|
|
const protoHeaderSize = 8
|
|
|
|
|
|
|
|
// only read from the network once our buffer has been exhausted.
|
|
|
|
// TODO: this preserves existing behavior as some tests rely on this
|
|
|
|
// implementation detail (it lets you replay a stream of bytes). we
|
|
|
|
// might want to consider removing this and fixing the tests as the
|
|
|
|
// protocol doesn't actually work this way.
|
|
|
|
if c.rxBuf.Len() == 0 {
|
|
|
|
for {
|
|
|
|
if c.connectTimeout != 0 {
|
|
|
|
_ = c.net.SetReadDeadline(time.Now().Add(c.connectTimeout))
|
|
|
|
}
|
2017-04-30 05:33:03 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
err := c.rxBuf.ReadFromOnce(c.net)
|
|
|
|
if err != nil {
|
|
|
|
return protoHeader{}, err
|
|
|
|
}
|
2017-04-30 05:33:03 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// read more if buf doesn't contain enough to parse the header
|
|
|
|
if c.rxBuf.Len() >= protoHeaderSize {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2018-02-12 02:26:24 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// reset outside the loop
|
2018-02-12 02:26:24 +03:00
|
|
|
if c.connectTimeout != 0 {
|
2022-12-14 01:00:03 +03:00
|
|
|
_ = c.net.SetReadDeadline(time.Time{})
|
2018-02-12 02:26:24 +03:00
|
|
|
}
|
2022-12-14 01:00:03 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
buf, ok := c.rxBuf.Next(protoHeaderSize)
|
|
|
|
if !ok {
|
|
|
|
return protoHeader{}, errors.New("invalid protoHeader")
|
|
|
|
}
|
|
|
|
// bounds check hint to compiler; see golang.org/issue/14808
|
|
|
|
_ = buf[protoHeaderSize-1]
|
|
|
|
|
|
|
|
if !bytes.Equal(buf[:4], []byte{'A', 'M', 'Q', 'P'}) {
|
|
|
|
return protoHeader{}, fmt.Errorf("unexpected protocol %q", buf[:4])
|
|
|
|
}
|
2018-02-12 02:26:24 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
p := protoHeader{
|
|
|
|
ProtoID: protoID(buf[4]),
|
|
|
|
Major: buf[5],
|
|
|
|
Minor: buf[6],
|
|
|
|
Revision: buf[7],
|
2017-05-04 05:30:30 +03:00
|
|
|
}
|
2018-02-12 02:26:24 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
if p.Major != 1 || p.Minor != 0 || p.Revision != 0 {
|
|
|
|
return protoHeader{}, fmt.Errorf("unexpected protocol version %d.%d.%d", p.Major, p.Minor, p.Revision)
|
|
|
|
}
|
|
|
|
|
|
|
|
return p, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// startTLS wraps the conn with TLS and returns to Client.negotiateProto
|
|
|
|
func (c *Conn) startTLS() (stateFunc, error) {
|
|
|
|
c.initTLSConfig()
|
|
|
|
|
|
|
|
_ = c.net.SetReadDeadline(time.Time{}) // clear timeout
|
2018-02-12 02:26:24 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// wrap existing net.Conn and perform TLS handshake
|
|
|
|
tlsConn := tls.Client(c.net, c.tlsConfig)
|
|
|
|
if err := tlsConn.Handshake(); err != nil {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, err
|
2017-04-30 05:33:03 +03:00
|
|
|
}
|
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// swap net.Conn
|
|
|
|
c.net = tlsConn
|
|
|
|
c.tlsComplete = true
|
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// go to next protocol
|
2022-07-05 17:12:59 +03:00
|
|
|
return c.negotiateProto, nil
|
2017-04-23 04:32:50 +03:00
|
|
|
}
|
|
|
|
|
2017-04-30 02:38:15 +03:00
|
|
|
// openAMQP round trips the AMQP open performative
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) openAMQP() (stateFunc, error) {
|
2017-05-01 08:02:53 +03:00
|
|
|
// send open frame
|
2021-09-15 21:40:36 +03:00
|
|
|
open := &frames.PerformOpen{
|
2019-12-02 21:06:02 +03:00
|
|
|
ContainerID: c.containerID,
|
|
|
|
Hostname: c.hostname,
|
|
|
|
MaxFrameSize: c.maxFrameSize,
|
|
|
|
ChannelMax: c.channelMax,
|
2021-09-20 17:22:47 +03:00
|
|
|
IdleTimeout: c.idleTimeout / 2, // per spec, advertise half our idle timeout
|
2019-12-02 21:06:02 +03:00
|
|
|
Properties: c.properties,
|
|
|
|
}
|
2022-10-26 01:33:27 +03:00
|
|
|
debug.Log(1, "TX (openAMQP): %s", open)
|
2022-07-05 17:12:59 +03:00
|
|
|
err := c.writeFrame(frames.Frame{
|
2022-11-07 21:54:43 +03:00
|
|
|
Type: frames.TypeAMQP,
|
2021-09-15 21:40:36 +03:00
|
|
|
Body: open,
|
|
|
|
Channel: 0,
|
2017-04-22 22:56:08 +03:00
|
|
|
})
|
2022-07-05 17:12:59 +03:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2017-05-04 09:56:55 +03:00
|
|
|
}
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// get the response
|
2022-12-14 01:00:03 +03:00
|
|
|
fr, err := c.readSingleFrame()
|
2017-04-01 23:00:36 +03:00
|
|
|
if err != nil {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, err
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
2021-09-15 21:40:36 +03:00
|
|
|
o, ok := fr.Body.(*frames.PerformOpen)
|
2017-04-24 06:24:12 +03:00
|
|
|
if !ok {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, fmt.Errorf("openAMQP: unexpected frame type %T", fr.Body)
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
2022-10-26 01:33:27 +03:00
|
|
|
debug.Log(1, "RX (openAMQP): %s", o)
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// update peer settings
|
2017-04-27 06:35:29 +03:00
|
|
|
if o.MaxFrameSize > 0 {
|
2022-11-18 01:44:40 +03:00
|
|
|
c.peerMaxFrameSize = o.MaxFrameSize
|
2017-04-27 06:35:29 +03:00
|
|
|
}
|
2017-04-23 21:01:44 +03:00
|
|
|
if o.IdleTimeout > 0 {
|
2017-05-07 01:26:17 +03:00
|
|
|
// TODO: reject very small idle timeouts
|
2017-04-30 02:38:15 +03:00
|
|
|
c.peerIdleTimeout = o.IdleTimeout
|
2017-04-17 06:39:31 +03:00
|
|
|
}
|
2017-04-01 23:00:36 +03:00
|
|
|
if o.ChannelMax < c.channelMax {
|
|
|
|
c.channelMax = o.ChannelMax
|
|
|
|
}
|
|
|
|
|
2017-05-01 08:02:53 +03:00
|
|
|
// connection established, exit state machine
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, nil
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
|
2017-04-30 02:38:15 +03:00
|
|
|
// negotiateSASL returns the SASL handler for the first matched
|
|
|
|
// mechanism specified by the server
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) negotiateSASL() (stateFunc, error) {
|
2017-05-03 04:49:31 +03:00
|
|
|
// read mechanisms frame
|
2022-12-14 01:00:03 +03:00
|
|
|
fr, err := c.readSingleFrame()
|
2017-04-01 23:00:36 +03:00
|
|
|
if err != nil {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, err
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
2021-09-15 21:40:36 +03:00
|
|
|
sm, ok := fr.Body.(*frames.SASLMechanisms)
|
2017-04-24 06:24:12 +03:00
|
|
|
if !ok {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, fmt.Errorf("negotiateSASL: unexpected frame type %T", fr.Body)
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
2022-10-26 01:33:27 +03:00
|
|
|
debug.Log(1, "RX (negotiateSASL): %s", sm)
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2017-05-03 04:49:31 +03:00
|
|
|
// return first match in c.saslHandlers based on order received
|
2017-04-01 23:00:36 +03:00
|
|
|
for _, mech := range sm.Mechanisms {
|
|
|
|
if state, ok := c.saslHandlers[mech]; ok {
|
2022-07-05 17:12:59 +03:00
|
|
|
return state, nil
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-03 04:49:31 +03:00
|
|
|
// no match
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, fmt.Errorf("no supported auth mechanism (%v)", sm.Mechanisms) // TODO: send "auth not supported" frame?
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
|
2017-05-04 05:30:30 +03:00
|
|
|
// saslOutcome processes the SASL outcome frame and return Client.negotiateProto
|
2017-04-30 02:38:15 +03:00
|
|
|
// on success.
|
|
|
|
//
|
|
|
|
// SASL handlers return this stateFunc when the mechanism specific negotiation
|
|
|
|
// has completed.
|
2021-09-20 20:55:31 +03:00
|
|
|
// used externally by SASL only.
|
2022-11-18 01:44:40 +03:00
|
|
|
func (c *Conn) saslOutcome() (stateFunc, error) {
|
2017-05-03 04:49:31 +03:00
|
|
|
// read outcome frame
|
2022-12-14 01:00:03 +03:00
|
|
|
fr, err := c.readSingleFrame()
|
2017-04-01 23:00:36 +03:00
|
|
|
if err != nil {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, err
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
2021-09-15 21:40:36 +03:00
|
|
|
so, ok := fr.Body.(*frames.SASLOutcome)
|
2017-04-24 06:24:12 +03:00
|
|
|
if !ok {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, fmt.Errorf("saslOutcome: unexpected frame type %T", fr.Body)
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
2022-10-26 01:33:27 +03:00
|
|
|
debug.Log(1, "RX (saslOutcome): %s", so)
|
2017-04-01 23:00:36 +03:00
|
|
|
|
2017-05-03 04:49:31 +03:00
|
|
|
// check if auth succeeded
|
2021-09-15 21:40:36 +03:00
|
|
|
if so.Code != encoding.CodeSASLOK {
|
2022-07-05 17:12:59 +03:00
|
|
|
return nil, fmt.Errorf("SASL PLAIN auth failed with code %#00x: %s", so.Code, so.AdditionalData) // implement Stringer for so.Code
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
|
|
|
|
2017-05-03 04:49:31 +03:00
|
|
|
// return to c.negotiateProto
|
2017-04-01 23:00:36 +03:00
|
|
|
c.saslComplete = true
|
2022-07-05 17:12:59 +03:00
|
|
|
return c.negotiateProto, nil
|
2017-04-01 23:00:36 +03:00
|
|
|
}
|
2017-04-24 06:24:12 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
// readSingleFrame is used during connection establishment to read a single frame.
|
2017-04-30 02:38:15 +03:00
|
|
|
//
|
2022-12-14 01:00:03 +03:00
|
|
|
// After setup, conn.connReader handles incoming frames.
|
|
|
|
func (c *Conn) readSingleFrame() (frames.Frame, error) {
|
2017-05-04 05:30:30 +03:00
|
|
|
if c.connectTimeout != 0 {
|
2022-12-14 01:00:03 +03:00
|
|
|
_ = c.net.SetDeadline(time.Now().Add(c.connectTimeout))
|
|
|
|
defer func() { _ = c.net.SetDeadline(time.Time{}) }()
|
2017-05-04 05:30:30 +03:00
|
|
|
}
|
2017-05-04 09:12:18 +03:00
|
|
|
|
2022-12-14 01:00:03 +03:00
|
|
|
fr, err := c.readFrame()
|
|
|
|
if err != nil {
|
|
|
|
return frames.Frame{}, err
|
2017-04-24 06:24:12 +03:00
|
|
|
}
|
2022-12-14 01:00:03 +03:00
|
|
|
|
|
|
|
return fr, nil
|
2017-04-24 06:24:12 +03:00
|
|
|
}
|
2021-09-08 20:27:48 +03:00
|
|
|
|
|
|
|
type protoHeader struct {
|
|
|
|
ProtoID protoID
|
|
|
|
Major uint8
|
|
|
|
Minor uint8
|
|
|
|
Revision uint8
|
|
|
|
}
|