This commit is contained in:
Kale Blankenship 2017-05-06 16:57:27 -07:00
Родитель 5851a7be18
Коммит f894f3c8f1
4 изменённых файлов: 219 добавлений и 84 удалений

127
client.go
Просмотреть файл

@ -59,8 +59,7 @@ func Dial(addr string, opts ...ConnOption) (*Client, error) {
return c, err
}
// New establishes an AMQP client connection on a pre-established
// net.Conn.
// New establishes an AMQP client connection over conn.
func New(conn net.Conn, opts ...ConnOption) (*Client, error) {
c, err := newConn(conn, opts...)
return &Client{conn: c}, err
@ -114,13 +113,13 @@ func (c *Client) NewSession() (*Session, error) {
//
// A session multiplexes Receivers.
type Session struct {
channel uint16
remoteChannel uint16
conn *conn
rx chan frame
channel uint16 // session's local channel
remoteChannel uint16 // session's remote channel
conn *conn // underlying conn
rx chan frame // frames destined for this session are sent on this chan by conn.mux
allocateHandle chan *link
deallocateHandle chan *link
allocateHandle chan *link // link handles are allocated by sending a link on this channel, nil is sent on link.rx once allocated
deallocateHandle chan *link // link handles are deallocated by sending a link on this channel
}
func newSession(c *conn, channel uint16) *Session {
@ -144,10 +143,11 @@ func (s *Session) Close() error {
}
}
// txFrame sends a frame to the connWriter
func (s *Session) txFrame(p frameBody) {
s.conn.wantWriteFrame(frame{
typ: frameTypeAMQP,
channel: s.channel,
channel: s.remoteChannel,
body: p,
})
}
@ -170,6 +170,7 @@ func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error) {
batching: DefaultLinkBatching,
batchMaxAge: DefaultLinkBatchMaxAge,
}
// add receiver to link so it can be configured via
// link options.
l.receiver = r
@ -181,7 +182,13 @@ func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error) {
return nil, err
}
}
// set creditUsed to linkCredit since no credit has been issued
// to remote yet
l.creditUsed = l.linkCredit
// buffer rx to linkCredit so that conn.mux won't block
// attempting to send to a slow reader
l.rx = make(chan frameBody, l.linkCredit)
// request handle from Session.mux
@ -198,15 +205,17 @@ func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error) {
case <-l.rx:
}
// send Attach frame
s.txFrame(&performAttach{
Name: randString(),
Handle: l.handle,
Role: true,
Role: roleReceiver,
Source: &source{
Address: l.sourceAddr,
},
})
// wait for response
var fr frameBody
select {
case <-s.conn.done:
@ -218,8 +227,10 @@ func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error) {
return nil, errorErrorf("unexpected attach response: %+v", fr)
}
l.senderDeliveryCount = resp.InitialDeliveryCount
// deliveryCount is a sequence number, must initialize to sender's initial sequence number
l.deliveryCount = resp.InitialDeliveryCount
// create dispositions channel and start dispositionBatcher if batching enabled
if r.batching {
// buffer dispositions chan to prevent disposition sends from blocking
r.dispositions = make(chan messageDisposition, l.linkCredit)
@ -230,23 +241,29 @@ func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error) {
}
func (s *Session) mux() {
links := make(map[uint32]*link)
var nextHandle uint32
links := make(map[uint32]*link) // mapping of handles to links
var nextHandle uint32 // next handle # to be allocated
for {
select {
// conn has completed, exit
case <-s.conn.done:
return
case l := <-s.allocateHandle:
l.handle = nextHandle
links[nextHandle] = l
nextHandle++ // TODO: handle max session/wrapping
l.rx <- nil
// handle allocation request
case l := <-s.allocateHandle:
// TODO: handle max session/wrapping
l.handle = nextHandle // allocate handle to the link
links[nextHandle] = l // add to mapping
nextHandle++ // increment the next handle
l.rx <- nil // send nil on channel to indicate allocation complete
// handle deallocation request
case l := <-s.deallocateHandle:
delete(links, l.handle)
close(l.rx)
close(l.rx) // close channel to indicate deallocation
// incoming frame for link
case fr := <-s.rx:
handle, ok := fr.body.link()
if !ok {
@ -279,6 +296,13 @@ func (e ErrDetach) Error() string {
return fmt.Sprintf("link detached, reason: %+v", e.RemoteError)
}
// Default link options
const (
DefaultLinkCredit = 1
DefaultLinkBatching = true
DefaultLinkBatchMaxAge = 5 * time.Second
)
// link is a unidirectional route.
//
// May be used for sending or receiving, currently only receive implemented.
@ -290,21 +314,21 @@ type link struct {
session *Session // parent session
receiver *Receiver // allows link options to modify Receiver
creditUsed uint32 // currently used credits
senderDeliveryCount uint32 // number of messages sent/received
detachSent bool // we've sent a detach frame
detachReceived bool
err error // err returned on Close()
creditUsed uint32 // currently used credits
// "The delivery-count is initialized by the sender when a link endpoint is created,
// and is incremented whenever a message is sent. Only the sender MAY independently
// modify this field. The receiver's value is calculated based on the last known
// value from the sender and any subsequent messages received on the link. Note that,
// despite its name, the delivery-count is not a count but a sequence number
// initialized at an arbitrary point by the sender."
deliveryCount uint32
detachSent bool // we've sent a detach frame
detachReceived bool
err error // err returned on Close()
}
// Default link options
const (
DefaultLinkCredit = 1
DefaultLinkBatching = true
DefaultLinkBatchMaxAge = 5 * time.Second
)
// newLink is used by Session.mux to create new links
// newLink is used by Receiver to create new links
func newLink(s *Session) *link {
return &link{
linkCredit: DefaultLinkCredit,
@ -357,11 +381,14 @@ outer:
if fr, ok := fr.(*performDetach); ok && fr.Closed {
break outer
}
// connection has ended
// connection has ended
case <-l.session.conn.done:
l.err = l.session.conn.err
}
}
// deallocate handle
select {
case l.session.deallocateHandle <- l:
case <-l.session.conn.done:
@ -414,8 +441,8 @@ func LinkMatchMaxAge(d time.Duration) LinkOption {
// Receiver receives messages on a single AMQP link.
type Receiver struct {
link *link
buf bytes.Buffer
link *link // underlying link
buf bytes.Buffer // resable buffer for decoding multi frame messages
batching bool // enable batching of message dispositions
batchMaxAge time.Duration // maximum time between the start n batch and sending the batch to the server
dispositions chan messageDisposition // message dispositions are sent on this channel when batching is enabled
@ -424,16 +451,23 @@ type Receiver struct {
// sendFlow transmits a flow frame with enough credits to bring the sender's
// link credits up to l.link.linkCredit.
func (r *Receiver) sendFlow() {
// determine how much credit to issue to get sender back to linkCredit
newLinkCredit := r.link.linkCredit - (r.link.linkCredit - r.link.creditUsed)
r.link.senderDeliveryCount += r.link.creditUsed
// increment delivery count
r.link.deliveryCount += r.link.creditUsed
// send flow
r.link.session.txFrame(&performFlow{
IncomingWindow: 2147483647,
NextOutgoingID: 0,
OutgoingWindow: 0,
Handle: &r.link.handle,
DeliveryCount: &r.link.senderDeliveryCount,
DeliveryCount: &r.link.deliveryCount,
LinkCredit: &newLinkCredit,
})
// reset credit used
r.link.creditUsed = 0
}
@ -443,15 +477,17 @@ func (r *Receiver) sendFlow() {
func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
r.buf.Reset()
msg := &Message{receiver: r}
msg := &Message{receiver: r} // message to be decoded into
first := true
first := true // receiving the first frame of the message
outer:
for {
// if linkCredit is half used, send more
if r.link.creditUsed > r.link.linkCredit/2 {
r.sendFlow()
}
// wait for the next frame
var fr frameBody
select {
case <-r.link.session.conn.done:
@ -460,24 +496,34 @@ outer:
case <-ctx.Done():
return nil, ctx.Err()
}
switch fr := fr.(type) {
// message frame
case *performTransfer:
r.link.creditUsed++
// record the delivery ID if this is the first frame of the message
if first && fr.DeliveryID != nil {
msg.id = (deliveryID)(*fr.DeliveryID)
first = false
}
// add the payload the the buffer
r.buf.Write(fr.Payload)
// break out of loop if message is complete
if !fr.More {
break outer
}
// remote side is closing links
case *performDetach:
// don't currently support link detach and reattach
if !fr.Closed {
log.Panicf("non-closing detach not supported: %+v", fr)
return nil, errorErrorf("non-closing detach not supported: %+v", fr)
}
// set detach received and close link
r.link.detachReceived = true
r.link.close(ctx)
@ -485,6 +531,7 @@ outer:
}
}
// unmarshal message
_, err := unmarshal(&r.buf, msg)
return msg, err
}

31
conn.go
Просмотреть файл

@ -11,11 +11,12 @@ import (
"time"
)
// connection defaults
// Connection defaults
const (
defaultMaxFrameSize = 512
defaultChannelMax = 1
defaultIdleTimeout = 1 * time.Minute
DefaultMaxFrameSize = 512
DefaultIdleTimeout = 1 * time.Minute
defaultChannelMax = 1
)
// Errors
@ -156,10 +157,10 @@ type conn struct {
func newConn(netConn net.Conn, opts ...ConnOption) (*conn, error) {
c := &conn{
net: netConn,
maxFrameSize: defaultMaxFrameSize,
peerMaxFrameSize: defaultMaxFrameSize,
maxFrameSize: DefaultMaxFrameSize,
peerMaxFrameSize: DefaultMaxFrameSize,
channelMax: defaultChannelMax,
idleTimeout: defaultIdleTimeout,
idleTimeout: DefaultIdleTimeout,
done: make(chan struct{}),
connErr: make(chan error, 2), // buffered to ensure connReader/Writer won't leak
rxProto: make(chan protoHeader),
@ -275,7 +276,9 @@ func (c *conn) mux() {
}
}
// frameReader reads one frame at a time
// frameReader returns io.EOF on each read, this allows
// ReadFrom to work with a net.conn without blocking until
// the connection is closed
type frameReader struct {
r io.Reader // underlying reader
}
@ -454,20 +457,26 @@ func (c *conn) writeFrame(fr frame) error {
if c.connectTimeout != 0 {
c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
}
// writeFrame into txBuf
c.txBuf.Reset()
err := writeFrame(&c.txBuf, fr)
if err != nil {
return err
}
// validate we're not exceeding peer's max frame size
if uint64(c.txBuf.Len()) > uint64(c.peerMaxFrameSize) {
return errorErrorf("frame larger than peer ")
}
// write to network
_, err = c.net.Write(c.txBuf.Bytes())
return err
}
// writeProtoHeader writes an AMQP protocol header to the
// network
func (c *conn) writeProtoHeader(pID protoID) error {
if c.connectTimeout != 0 {
c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
@ -479,6 +488,8 @@ func (c *conn) writeProtoHeader(pID protoID) error {
// keepaliveFrame is an AMQP frame with no body, used for keepalives
var keepaliveFrame = []byte{0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}
// wantWriteFrame is used by sessions and links to send frame to
// connWriter.
func (c *conn) wantWriteFrame(fr frame) {
select {
case c.txFrame <- fr:
@ -549,7 +560,7 @@ func (c *conn) exchangeProtoHeader(pID protoID) stateFunc {
}
}
// readProtoHeader reads a protocol header packetc.rxProto.
// readProtoHeader reads a protocol header packet from c.rxProto.
func (c *conn) readProtoHeader() (protoHeader, error) {
var deadline <-chan time.Time
if c.connectTimeout != 0 {
@ -710,7 +721,7 @@ func (c *conn) saslOutcome() stateFunc {
// readFrame is used during connection establishment to read a single frame.
//
// After setup, Client.mux handles incoming frames.
// After setup, conn.mux handles incoming frames.
func (c *conn) readFrame() (frame, error) {
var deadline <-chan time.Time
if c.connectTimeout != 0 {

111
decode.go
Просмотреть файл

@ -116,96 +116,96 @@ type unmarshaler interface {
// map[interface{}]interface{}), will be decoded via conversion to the mapStringAny,
// mapSymbolAny, and mapAnyAny types.
//
// If the decoding function returns errNull, the null return value will
// If the decoding function returns errNull, the isNull return value will
// be true and err will be nil.
func unmarshal(r reader, i interface{}) (null bool, err error) {
func unmarshal(r reader, i interface{}) (isNull bool, err error) {
defer func() {
// prevent errNull from being passed up
if err == errNull {
null = true
isNull = true
err = nil
}
}()
switch t := i.(type) {
case unmarshaler:
return null, t.unmarshal(r)
return isNull, t.unmarshal(r)
case *int:
val, err := readInt(r)
if err != nil {
return null, err
return isNull, err
}
*t = val
case *uint64:
val, err := readUint(r)
if err != nil {
return null, err
return isNull, err
}
*t = uint64(val)
case *uint32:
val, err := readUint(r)
if err != nil {
return null, err
return isNull, err
}
*t = uint32(val)
case *uint16:
val, err := readUint(r)
if err != nil {
return null, err
return isNull, err
}
*t = uint16(val)
case *uint8:
val, err := readUint(r)
if err != nil {
return null, err
return isNull, err
}
*t = uint8(val)
case *string:
val, err := readString(r)
if err != nil {
return null, err
return isNull, err
}
*t = val
case *[]Symbol:
sa, err := readSymbolArray(r)
if err != nil {
return null, err
return isNull, err
}
*t = sa
case *Symbol:
s, err := readString(r)
if err != nil {
return null, err
return isNull, err
}
*t = Symbol(s)
case *[]byte:
val, err := readBinary(r)
if err != nil {
return null, err
return isNull, err
}
*t = val
case *bool:
b, err := readBool(r)
if err != nil {
return null, err
return isNull, err
}
*t = b
case *time.Time:
ts, err := readTimestamp(r)
if err != nil {
return null, err
return isNull, err
}
*t = ts
case *map[interface{}]interface{}:
return null, (*mapAnyAny)(t).unmarshal(r)
return isNull, (*mapAnyAny)(t).unmarshal(r)
case *map[string]interface{}:
return null, (*mapStringAny)(t).unmarshal(r)
return isNull, (*mapStringAny)(t).unmarshal(r)
case *map[Symbol]interface{}:
return null, (*mapSymbolAny)(t).unmarshal(r)
return isNull, (*mapSymbolAny)(t).unmarshal(r)
case *interface{}:
v, err := readAny(r)
if err != nil {
return null, err
return isNull, err
}
*t = v
default:
@ -217,9 +217,9 @@ func unmarshal(r reader, i interface{}) (null bool, err error) {
}
return unmarshal(r, indirect.Interface())
}
return null, errorErrorf("unable to unmarshal %T", i)
return isNull, errorErrorf("unable to unmarshal %T", i)
}
return null, nil
return isNull, nil
}
// unmarshalComposite is a helper for us in a composite's unmarshal() function.
@ -250,8 +250,7 @@ func unmarshalComposite(r reader, typ amqpType, fields ...unmarshalField) error
return errorWrapf(err, "unmarshaling field %d", i)
}
// If the field is null and handleNull is set,
// call it.
// If the field is null and handleNull is set, call it.
if null && fields[i].handleNull != nil {
err = fields[i].handleNull()
if err != nil {
@ -523,21 +522,71 @@ func readAny(r reader) (interface{}, error) {
}
switch amqpType(b) {
case typeCodeBool, typeCodeBoolTrue, typeCodeBoolFalse:
// bool
case
typeCodeBool,
typeCodeBoolTrue,
typeCodeBoolFalse:
return readBool(r)
case typeCodeUbyte, typeCodeUshort, typeCodeUint, typeCodeSmallUint, typeCodeUint0, typeCodeUlong, typeCodeSmallUlong, typeCodeUlong0:
// unsigned integers
case
typeCodeUbyte,
typeCodeUshort,
typeCodeUint,
typeCodeSmallUint,
typeCodeUint0,
typeCodeUlong,
typeCodeSmallUlong,
typeCodeUlong0:
return readUint(r)
case typeCodeByte, typeCodeShort, typeCodeInt, typeCodeSmallint, typeCodeLong, typeCodeSmalllong:
// signed integers
case
typeCodeByte,
typeCodeShort,
typeCodeInt,
typeCodeSmallint,
typeCodeLong,
typeCodeSmalllong:
return readInt(r)
case typeCodeFloat, typeCodeDouble, typeCodeDecimal32, typeCodeDecimal64, typeCodeDecimal128, typeCodeChar, typeCodeUUID,
typeCodeList0, typeCodeList8, typeCodeList32, typeCodeMap8, typeCodeMap32, typeCodeArray8, typeCodeArray32:
return nil, errorErrorf("%0x not implemented", b)
case typeCodeVbin8, typeCodeVbin32:
// binary
case
typeCodeVbin8,
typeCodeVbin32:
return readBinary(r)
case typeCodeStr8, typeCodeStr32, typeCodeSym8, typeCodeSym32:
// strings
case
typeCodeStr8,
typeCodeStr32,
typeCodeSym8,
typeCodeSym32:
return readString(r)
// timestamp
case typeCodeTimestamp:
return readTimestamp(r)
// not-implemented
case
typeCodeFloat,
typeCodeDouble,
typeCodeDecimal32,
typeCodeDecimal64,
typeCodeDecimal128,
typeCodeChar,
typeCodeUUID,
typeCodeList0,
typeCodeList8,
typeCodeList32,
typeCodeMap8,
typeCodeMap32,
typeCodeArray8,
typeCodeArray32:
return nil, errorErrorf("%0x not implemented", b)
default:
return nil, errorErrorf("unknown type %0x", b)
}

Просмотреть файл

@ -351,7 +351,7 @@ type performAttach struct {
//
// The role being played by the peer, i.e., whether the peer is the sender or the
// receiver of messages on the link.
Role bool // required, true=receiver / false=sender
Role role
// settlement policy for the sender
//
@ -495,6 +495,22 @@ func (a *performAttach) unmarshal(r reader) error {
}...)
}
type role bool
const (
roleSender role = false
roleReceiver role = true
)
func (rl *role) unmarshal(r reader) error {
_, err := unmarshal(r, (*bool)(rl))
return err
}
func (rl *role) marshal(wr writer) error {
return marshal(wr, (*bool)(rl))
}
type deliveryState interface{} // TODO: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#type-declared
type unsettled map[string]deliveryState
@ -1172,7 +1188,7 @@ type performDisposition struct {
//
// The role identifies whether the disposition frame contains information about
// sending link endpoints or receiving link endpoints.
Role bool // required, true=receiver / false=sender
Role role
// lower bound of deliveries
//
@ -1477,33 +1493,45 @@ func (m *Message) Release() {
// TODO: add support for sending Modified disposition
func (m *Message) unmarshal(r reader) error {
// loop, decoding sections until bytes have been consumed
for r.Len() > 0 {
// determine type
typ, err := peekMessageType(r.Bytes())
if err != nil {
return err
}
var (
section interface{}
section interface{}
// section header is read from r before
// unmarshaling section is set to true
discardHeader = true
)
switch amqpType(typ) {
case typeCodeMessageHeader:
discardHeader = false
section = &m.Header
case typeCodeDeliveryAnnotations:
section = &m.DeliveryAnnotations
case typeCodeMessageAnnotations:
section = &m.Annotations
case typeCodeMessageProperties:
discardHeader = false
section = &m.Properties
case typeCodeApplicationProperties:
section = &m.ApplicationProperties
case typeCodeApplicationData:
section = &m.Data
case typeCodeFooter:
section = &m.Footer
default:
return errorErrorf("unknown message section %x", typ)
}