Move frame types to internal/frames package (#62)

Exported fields on the Frame type.
This required moving some dependent constants to the encoding package.
ReadBool no longer needs to be exported from encoding.
Refactored value() methods on a few constant types to avoid them from
being part of public surface area.
This commit is contained in:
Joel Hendrix 2021-09-15 11:40:36 -07:00 коммит произвёл GitHub
Родитель ea76cacf01
Коммит 7fe27c31cd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
18 изменённых файлов: 684 добавлений и 602 удалений

Просмотреть файл

@ -11,6 +11,9 @@ import (
"net/url"
"sync"
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
var (
@ -127,7 +130,7 @@ func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
}
// send Begin to server
begin := &performBegin{
begin := &frames.PerformBegin{
NextOutgoingID: 0,
IncomingWindow: s.incomingWindow,
OutgoingWindow: s.outgoingWindow,
@ -137,18 +140,18 @@ func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
_ = s.txFrame(begin, nil)
// wait for response
var fr frame
var fr frames.Frame
select {
case <-c.conn.done:
return nil, c.conn.getErr()
case fr = <-s.rx:
}
debug(1, "RX: %s", fr.body)
debug(1, "RX: %s", fr.Body)
begin, ok := fr.body.(*performBegin)
begin, ok := fr.Body.(*frames.PerformBegin)
if !ok {
_ = s.Close(context.Background()) // deallocate session on error
return nil, fmt.Errorf("unexpected begin response: %+v", fr.body)
return nil, fmt.Errorf("unexpected begin response: %+v", fr.Body)
}
// start Session multiplexor
@ -255,7 +258,7 @@ const (
// to a boolean flag indicating the direction of the link.
type linkKey struct {
name string
role role // Local role: sender/receiver
role encoding.Role // Local role: sender/receiver
}
const maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader

Просмотреть файл

@ -5,6 +5,7 @@ import (
"testing"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
func TestLinkOptions(t *testing.T) {
@ -12,7 +13,7 @@ func TestLinkOptions(t *testing.T) {
label string
opts []LinkOption
wantSource *source
wantSource *frames.Source
wantProperties map[encoding.Symbol]interface{}
}{
{
@ -30,7 +31,7 @@ func TestLinkOptions(t *testing.T) {
LinkSourceFilter("com.microsoft:session-filter", 0x00000137000000C, "123"),
},
wantSource: &source{
wantSource: &frames.Source{
Filter: map[encoding.Symbol]*encoding.DescribedType{
"apache.org:selector-filter:string": {
Descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x46, 0x8C, 0x00, 0x00, 0x00, 0x04}),
@ -55,7 +56,7 @@ func TestLinkOptions(t *testing.T) {
LinkSourceFilter("com.microsoft:session-filter", 0x00000137000000C, nil),
},
wantSource: &source{
wantSource: &frames.Source{
Filter: map[encoding.Symbol]*encoding.DescribedType{
"com.microsoft:session-filter": {
Descriptor: binary.BigEndian.Uint64([]byte{0x00, 0x00, 0x00, 0x13, 0x70, 0x00, 0x00, 0x0C}),
@ -69,7 +70,7 @@ func TestLinkOptions(t *testing.T) {
opts: []LinkOption{
LinkSourceCapabilities("cap1", "cap2", "cap3"),
},
wantSource: &source{
wantSource: &frames.Source{
Capabilities: []encoding.Symbol{"cap1", "cap2", "cap3"},
},
},

103
conn.go
Просмотреть файл

@ -14,6 +14,7 @@ import (
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// Default connection options
@ -203,12 +204,12 @@ type conn struct {
// connReader
rxProto chan protoHeader // protoHeaders received by connReader
rxFrame chan frame // AMQP frames received by connReader
rxFrame chan frames.Frame // AMQP frames received by connReader
rxDone chan struct{}
connReaderRun chan func() // functions to be run by conn reader (set deadline on conn to run)
// connWriter
txFrame chan frame // AMQP frames to be sent by connWriter
txFrame chan frames.Frame // AMQP frames to be sent by connWriter
txBuf buffer.Buffer // buffer for marshaling frames before transmitting
txDone chan struct{}
}
@ -230,12 +231,12 @@ func newConn(netConn net.Conn, opts ...ConnOption) (*conn, error) {
connErr: make(chan error, 2), // buffered to ensure connReader/Writer won't leak
closeMux: make(chan struct{}),
rxProto: make(chan protoHeader),
rxFrame: make(chan frame),
rxFrame: make(chan frames.Frame),
rxDone: make(chan struct{}),
connReaderRun: make(chan func(), 1), // buffered to allow queueing function before interrupt
newSession: make(chan newSessionResp),
delSession: make(chan *Session),
txFrame: make(chan frame),
txFrame: make(chan frames.Frame),
txDone: make(chan struct{}),
}
@ -365,9 +366,9 @@ func (c *conn) mux() {
ok bool
)
switch body := fr.body.(type) {
switch body := fr.Body.(type) {
// Server initiated close.
case *performClose:
case *frames.PerformClose:
if body.Error != nil {
c.err = body.Error
} else {
@ -376,7 +377,7 @@ func (c *conn) mux() {
return
// RemoteChannel should be used when frame is Begin
case *performBegin:
case *frames.PerformBegin:
if body.RemoteChannel == nil {
break
}
@ -385,15 +386,15 @@ func (c *conn) mux() {
break
}
session.remoteChannel = fr.channel
sessionsByRemoteChannel[fr.channel] = session
session.remoteChannel = fr.Channel
sessionsByRemoteChannel[fr.Channel] = session
default:
session, ok = sessionsByRemoteChannel[fr.channel]
session, ok = sessionsByRemoteChannel[fr.Channel]
}
if !ok {
c.err = fmt.Errorf("unexpected frame: %#v", fr.body)
c.err = fmt.Errorf("unexpected frame: %#v", fr.Body)
continue
}
@ -567,7 +568,7 @@ func (c *conn) connReader() {
select {
case <-c.done:
return
case c.rxFrame <- frame{channel: currentHeader.Channel, body: parsedBody}:
case c.rxFrame <- frames.Frame{Channel: currentHeader.Channel, Body: parsedBody}:
}
}
}
@ -607,8 +608,8 @@ func (c *conn) connWriter() {
// frame write request
case fr := <-c.txFrame:
err = c.writeFrame(fr)
if err == nil && fr.done != nil {
close(fr.done)
if err == nil && fr.Done != nil {
close(fr.Done)
}
// keepalive timer
@ -625,11 +626,11 @@ func (c *conn) connWriter() {
// connection complete
case <-c.done:
// send close
cls := &performClose{}
cls := &frames.PerformClose{}
debug(1, "TX: %s", cls)
_ = c.writeFrame(frame{
type_: frameTypeAMQP,
body: cls,
_ = c.writeFrame(frames.Frame{
Type: frameTypeAMQP,
Body: cls,
})
return
}
@ -638,7 +639,7 @@ func (c *conn) connWriter() {
// writeFrame writes a frame to the network, may only be used
// by connWriter after initial negotiation.
func (c *conn) writeFrame(fr frame) error {
func (c *conn) writeFrame(fr frames.Frame) error {
if c.connectTimeout != 0 {
_ = c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
}
@ -676,7 +677,7 @@ var keepaliveFrame = []byte{0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}
// wantWriteFrame is used by sessions and links to send frame to
// connWriter.
func (c *conn) wantWriteFrame(fr frame) error {
func (c *conn) wantWriteFrame(fr frames.Frame) error {
select {
case c.txFrame <- fr:
return nil
@ -807,7 +808,7 @@ func (c *conn) startTLS() stateFunc {
// openAMQP round trips the AMQP open performative
func (c *conn) openAMQP() stateFunc {
// send open frame
open := &performOpen{
open := &frames.PerformOpen{
ContainerID: c.containerID,
Hostname: c.hostname,
MaxFrameSize: c.maxFrameSize,
@ -816,10 +817,10 @@ func (c *conn) openAMQP() stateFunc {
Properties: c.properties,
}
debug(1, "TX: %s", open)
c.err = c.writeFrame(frame{
type_: frameTypeAMQP,
body: open,
channel: 0,
c.err = c.writeFrame(frames.Frame{
Type: frameTypeAMQP,
Body: open,
Channel: 0,
})
if c.err != nil {
return nil
@ -831,9 +832,9 @@ func (c *conn) openAMQP() stateFunc {
c.err = err
return nil
}
o, ok := fr.body.(*performOpen)
o, ok := fr.Body.(*frames.PerformOpen)
if !ok {
c.err = fmt.Errorf("unexpected frame type %T", fr.body)
c.err = fmt.Errorf("unexpected frame type %T", fr.Body)
return nil
}
debug(1, "RX: %s", o)
@ -863,9 +864,9 @@ func (c *conn) negotiateSASL() stateFunc {
c.err = err
return nil
}
sm, ok := fr.body.(*saslMechanisms)
sm, ok := fr.Body.(*frames.SASLMechanisms)
if !ok {
c.err = fmt.Errorf("unexpected frame type %T", fr.body)
c.err = fmt.Errorf("unexpected frame type %T", fr.Body)
return nil
}
debug(1, "RX: %s", sm)
@ -894,15 +895,15 @@ func (c *conn) saslOutcome() stateFunc {
c.err = err
return nil
}
so, ok := fr.body.(*saslOutcome)
so, ok := fr.Body.(*frames.SASLOutcome)
if !ok {
c.err = fmt.Errorf("unexpected frame type %T", fr.body)
c.err = fmt.Errorf("unexpected frame type %T", fr.Body)
return nil
}
debug(1, "RX: %s", so)
// check if auth succeeded
if so.Code != codeSASLOK {
if so.Code != encoding.CodeSASLOK {
c.err = fmt.Errorf("SASL PLAIN auth failed with code %#00x: %s", so.Code, so.AdditionalData) // implement Stringer for so.Code
return nil
}
@ -915,13 +916,13 @@ func (c *conn) saslOutcome() stateFunc {
// readFrame is used during connection establishment to read a single frame.
//
// After setup, conn.mux handles incoming frames.
func (c *conn) readFrame() (frame, error) {
func (c *conn) readFrame() (frames.Frame, error) {
var deadline <-chan time.Time
if c.connectTimeout != 0 {
deadline = time.After(c.connectTimeout)
}
var fr frame
var fr frames.Frame
select {
case fr = <-c.rxFrame:
return fr, nil
@ -1020,7 +1021,7 @@ func parseProtoHeader(r *buffer.Buffer) (protoHeader, error) {
}
// parseFrameBody reads and unmarshals an AMQP frame.
func parseFrameBody(r *buffer.Buffer) (frameBody, error) {
func parseFrameBody(r *buffer.Buffer) (frames.FrameBody, error) {
payload := r.Bytes()
if r.Len() < 3 || payload[0] != 0 || encoding.AMQPType(payload[1]) != encoding.TypeCodeSmallUlong {
@ -1029,51 +1030,51 @@ func parseFrameBody(r *buffer.Buffer) (frameBody, error) {
switch pType := encoding.AMQPType(payload[2]); pType {
case encoding.TypeCodeOpen:
t := new(performOpen)
t := new(frames.PerformOpen)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeBegin:
t := new(performBegin)
t := new(frames.PerformBegin)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeAttach:
t := new(performAttach)
t := new(frames.PerformAttach)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeFlow:
t := new(performFlow)
t := new(frames.PerformFlow)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeTransfer:
t := new(performTransfer)
t := new(frames.PerformTransfer)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeDisposition:
t := new(performDisposition)
t := new(frames.PerformDisposition)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeDetach:
t := new(performDetach)
t := new(frames.PerformDetach)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeEnd:
t := new(performEnd)
t := new(frames.PerformEnd)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeClose:
t := new(performClose)
t := new(frames.PerformClose)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLMechanism:
t := new(saslMechanisms)
t := new(frames.SASLMechanisms)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLChallenge:
t := new(saslChallenge)
t := new(frames.SASLChallenge)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLOutcome:
t := new(saslOutcome)
t := new(frames.SASLOutcome)
err := t.Unmarshal(r)
return t, err
default:
@ -1082,17 +1083,17 @@ func parseFrameBody(r *buffer.Buffer) (frameBody, error) {
}
// writesFrame encodes fr into buf.
func writeFrame(buf *buffer.Buffer, fr frame) error {
func writeFrame(buf *buffer.Buffer, fr frames.Frame) error {
// write header
buf.Append([]byte{
0, 0, 0, 0, // size, overwrite later
2, // doff, see frameHeader.DataOffset comment
fr.type_, // frame type
fr.Type, // frame type
})
buf.AppendUint16(fr.channel) // channel
buf.AppendUint16(fr.Channel) // channel
// write AMQP frame body
err := encoding.Marshal(buf, fr.body)
err := encoding.Marshal(buf, fr.Body)
if err != nil {
return err
}

156
const.go
Просмотреть файл

@ -1,58 +1,23 @@
package amqp
import (
"fmt"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
)
import "github.com/Azure/go-amqp/internal/encoding"
// Sender Settlement Modes
const (
// Sender will send all deliveries initially unsettled to the receiver.
ModeUnsettled SenderSettleMode = 0
ModeUnsettled = encoding.ModeUnsettled
// Sender will send all deliveries settled to the receiver.
ModeSettled SenderSettleMode = 1
ModeSettled = encoding.ModeSettled
// Sender MAY send a mixture of settled and unsettled deliveries to the receiver.
ModeMixed SenderSettleMode = 2
ModeMixed = encoding.ModeMixed
)
// SenderSettleMode specifies how the sender will settle messages.
type SenderSettleMode uint8
type SenderSettleMode = encoding.SenderSettleMode
func (m *SenderSettleMode) String() string {
if m == nil {
return "<nil>"
}
switch *m {
case ModeUnsettled:
return "unsettled"
case ModeSettled:
return "settled"
case ModeMixed:
return "mixed"
default:
return fmt.Sprintf("unknown sender mode %d", uint8(*m))
}
}
func (m SenderSettleMode) Marshal(wr *buffer.Buffer) error {
return encoding.Marshal(wr, uint8(m))
}
func (m *SenderSettleMode) Unmarshal(r *buffer.Buffer) error {
n, err := encoding.ReadUbyte(r)
*m = SenderSettleMode(n)
return err
}
func (m *SenderSettleMode) value() SenderSettleMode {
func senderSettleModeValue(m *SenderSettleMode) SenderSettleMode {
if m == nil {
return ModeMixed
}
@ -62,45 +27,18 @@ func (m *SenderSettleMode) value() SenderSettleMode {
// Receiver Settlement Modes
const (
// Receiver will spontaneously settle all incoming transfers.
ModeFirst ReceiverSettleMode = 0
ModeFirst = encoding.ModeFirst
// Receiver will only settle after sending the disposition to the
// sender and receiving a disposition indicating settlement of
// the delivery from the sender.
ModeSecond ReceiverSettleMode = 1
ModeSecond = encoding.ModeSecond
)
// ReceiverSettleMode specifies how the receiver will settle messages.
type ReceiverSettleMode uint8
type ReceiverSettleMode = encoding.ReceiverSettleMode
func (m *ReceiverSettleMode) String() string {
if m == nil {
return "<nil>"
}
switch *m {
case ModeFirst:
return "first"
case ModeSecond:
return "second"
default:
return fmt.Sprintf("unknown receiver mode %d", uint8(*m))
}
}
func (m ReceiverSettleMode) Marshal(wr *buffer.Buffer) error {
return encoding.Marshal(wr, uint8(m))
}
func (m *ReceiverSettleMode) Unmarshal(r *buffer.Buffer) error {
n, err := encoding.ReadUbyte(r)
*m = ReceiverSettleMode(n)
return err
}
func (m *ReceiverSettleMode) value() ReceiverSettleMode {
func receiverSettleModeValue(m *ReceiverSettleMode) ReceiverSettleMode {
if m == nil {
return ModeFirst
}
@ -110,61 +48,36 @@ func (m *ReceiverSettleMode) value() ReceiverSettleMode {
// Durability Policies
const (
// No terminus state is retained durably.
DurabilityNone Durability = 0
DurabilityNone = encoding.DurabilityNone
// Only the existence and configuration of the terminus is
// retained durably.
DurabilityConfiguration Durability = 1
DurabilityConfiguration = encoding.DurabilityConfiguration
// In addition to the existence and configuration of the
// terminus, the unsettled state for durable messages is
// retained durably.
DurabilityUnsettledState Durability = 2
DurabilityUnsettledState = encoding.DurabilityUnsettledState
)
// Durability specifies the durability of a link.
type Durability uint32
func (d *Durability) String() string {
if d == nil {
return "<nil>"
}
switch *d {
case DurabilityNone:
return "none"
case DurabilityConfiguration:
return "configuration"
case DurabilityUnsettledState:
return "unsettled-state"
default:
return fmt.Sprintf("unknown durability %d", *d)
}
}
func (d Durability) Marshal(wr *buffer.Buffer) error {
return encoding.Marshal(wr, uint32(d))
}
func (d *Durability) Unmarshal(r *buffer.Buffer) error {
return encoding.Unmarshal(r, (*uint32)(d))
}
type Durability = encoding.Durability
// Expiry Policies
const (
// The expiry timer starts when terminus is detached.
ExpiryLinkDetach ExpiryPolicy = "link-detach"
ExpiryLinkDetach = encoding.ExpiryLinkDetach
// The expiry timer starts when the most recently
// associated session is ended.
ExpirySessionEnd ExpiryPolicy = "session-end"
ExpirySessionEnd = encoding.ExpirySessionEnd
// The expiry timer starts when most recently associated
// connection is closed.
ExpiryConnectionClose ExpiryPolicy = "connection-close"
ExpiryConnectionClose = encoding.ExpiryConnectionClose
// The terminus never expires.
ExpiryNever ExpiryPolicy = "never"
ExpiryNever = encoding.ExpiryNever
)
// ExpiryPolicy specifies when the expiry timer of a terminus
@ -174,35 +87,4 @@ const (
// then the count down is aborted. If the conditions for the
// terminus-expiry-policy are subsequently re-met, the expiry timer restarts
// from its originally configured timeout value.
type ExpiryPolicy encoding.Symbol
func (e ExpiryPolicy) validate() error {
switch e {
case ExpiryLinkDetach,
ExpirySessionEnd,
ExpiryConnectionClose,
ExpiryNever:
return nil
default:
return fmt.Errorf("unknown expiry-policy %q", e)
}
}
func (e ExpiryPolicy) Marshal(wr *buffer.Buffer) error {
return encoding.Symbol(e).Marshal(wr)
}
func (e *ExpiryPolicy) Unmarshal(r *buffer.Buffer) error {
err := encoding.Unmarshal(r, (*encoding.Symbol)(e))
if err != nil {
return err
}
return e.validate()
}
func (e *ExpiryPolicy) String() string {
if e == nil {
return "<nil>"
}
return string(*e)
}
type ExpiryPolicy = encoding.ExpiryPolicy

Просмотреть файл

@ -11,6 +11,7 @@ import (
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/testconn"
"github.com/fortytw2/leaktest"
)
@ -89,46 +90,44 @@ func fuzzConn(data []byte) int {
func fuzzUnmarshal(data []byte) int {
types := []interface{}{
new(performAttach),
new(*performAttach),
new(performBegin),
new(*performBegin),
new(performClose),
new(*performClose),
new(performDetach),
new(*performDetach),
new(performDisposition),
new(*performDisposition),
new(performEnd),
new(*performEnd),
new(performFlow),
new(*performFlow),
new(performOpen),
new(*performOpen),
new(performTransfer),
new(*performTransfer),
new(source),
new(*source),
new(target),
new(*target),
new(saslCode),
new(*saslCode),
new(saslMechanisms),
new(*saslMechanisms),
new(saslChallenge),
new(*saslChallenge),
new(saslResponse),
new(*saslResponse),
new(saslOutcome),
new(*saslOutcome),
new(frames.PerformAttach),
new(*frames.PerformAttach),
new(frames.PerformBegin),
new(*frames.PerformBegin),
new(frames.PerformClose),
new(*frames.PerformClose),
new(frames.PerformDetach),
new(*frames.PerformDetach),
new(frames.PerformDisposition),
new(*frames.PerformDisposition),
new(frames.PerformEnd),
new(*frames.PerformEnd),
new(frames.PerformFlow),
new(*frames.PerformFlow),
new(frames.PerformOpen),
new(*frames.PerformOpen),
new(frames.PerformTransfer),
new(*frames.PerformTransfer),
new(frames.Source),
new(*frames.Source),
new(frames.Target),
new(*frames.Target),
new(encoding.SASLCode),
new(*encoding.SASLCode),
new(frames.SASLMechanisms),
new(*frames.SASLMechanisms),
new(frames.SASLChallenge),
new(*frames.SASLChallenge),
new(frames.SASLResponse),
new(*frames.SASLResponse),
new(frames.SASLOutcome),
new(*frames.SASLOutcome),
new(Message),
new(*Message),
new(MessageHeader),
new(*MessageHeader),
new(MessageProperties),
new(*MessageProperties),
new(role),
new(*role),
}
for _, t := range types {

Просмотреть файл

@ -128,7 +128,7 @@ func Unmarshal(r *buffer.Buffer, i interface{}) error {
}
*t = val
case *bool:
b, err := ReadBool(r)
b, err := readBool(r)
if err != nil {
return err
}
@ -494,7 +494,7 @@ func ReadAny(r *buffer.Buffer) (interface{}, error) {
// bool
case TypeCodeBool, TypeCodeBoolTrue, TypeCodeBoolFalse:
return ReadBool(r)
return readBool(r)
// uint
case TypeCodeUbyte:
@ -1038,7 +1038,7 @@ func readDouble(r *buffer.Buffer) (float64, error) {
return math.Float64frombits(bits), err
}
func ReadBool(r *buffer.Buffer) (bool, error) {
func readBool(r *buffer.Buffer) (bool, error) {
type_, err := readType(r)
if err != nil {
return false, err

Просмотреть файл

@ -77,6 +77,8 @@ func fuzzUnmarshal(data []byte) int {
new(*ErrorCondition),
new(UUID),
new(*UUID),
new(Role),
new(*Role),
}
for _, t := range types {

Просмотреть файл

@ -114,6 +114,235 @@ const (
TypeCodeDeleteOnNoLinksOrMessages AMQPType = 0x2e
)
// Durability Policies
const (
// No terminus state is retained durably.
DurabilityNone Durability = 0
// Only the existence and configuration of the terminus is
// retained durably.
DurabilityConfiguration Durability = 1
// In addition to the existence and configuration of the
// terminus, the unsettled state for durable messages is
// retained durably.
DurabilityUnsettledState Durability = 2
)
// Durability specifies the durability of a link.
type Durability uint32
func (d *Durability) String() string {
if d == nil {
return "<nil>"
}
switch *d {
case DurabilityNone:
return "none"
case DurabilityConfiguration:
return "configuration"
case DurabilityUnsettledState:
return "unsettled-state"
default:
return fmt.Sprintf("unknown durability %d", *d)
}
}
func (d Durability) Marshal(wr *buffer.Buffer) error {
return Marshal(wr, uint32(d))
}
func (d *Durability) Unmarshal(r *buffer.Buffer) error {
return Unmarshal(r, (*uint32)(d))
}
// Expiry Policies
const (
// The expiry timer starts when terminus is detached.
ExpiryLinkDetach ExpiryPolicy = "link-detach"
// The expiry timer starts when the most recently
// associated session is ended.
ExpirySessionEnd ExpiryPolicy = "session-end"
// The expiry timer starts when most recently associated
// connection is closed.
ExpiryConnectionClose ExpiryPolicy = "connection-close"
// The terminus never expires.
ExpiryNever ExpiryPolicy = "never"
)
// ExpiryPolicy specifies when the expiry timer of a terminus
// starts counting down from the timeout value.
//
// If the link is subsequently re-attached before the terminus is expired,
// then the count down is aborted. If the conditions for the
// terminus-expiry-policy are subsequently re-met, the expiry timer restarts
// from its originally configured timeout value.
type ExpiryPolicy Symbol
func ValidateExpiryPolicy(e ExpiryPolicy) error {
switch e {
case ExpiryLinkDetach,
ExpirySessionEnd,
ExpiryConnectionClose,
ExpiryNever:
return nil
default:
return fmt.Errorf("unknown expiry-policy %q", e)
}
}
func (e ExpiryPolicy) Marshal(wr *buffer.Buffer) error {
return Symbol(e).Marshal(wr)
}
func (e *ExpiryPolicy) Unmarshal(r *buffer.Buffer) error {
err := Unmarshal(r, (*Symbol)(e))
if err != nil {
return err
}
return ValidateExpiryPolicy(*e)
}
func (e *ExpiryPolicy) String() string {
if e == nil {
return "<nil>"
}
return string(*e)
}
// Sender Settlement Modes
const (
// Sender will send all deliveries initially unsettled to the receiver.
ModeUnsettled SenderSettleMode = 0
// Sender will send all deliveries settled to the receiver.
ModeSettled SenderSettleMode = 1
// Sender MAY send a mixture of settled and unsettled deliveries to the receiver.
ModeMixed SenderSettleMode = 2
)
// SenderSettleMode specifies how the sender will settle messages.
type SenderSettleMode uint8
func (m *SenderSettleMode) String() string {
if m == nil {
return "<nil>"
}
switch *m {
case ModeUnsettled:
return "unsettled"
case ModeSettled:
return "settled"
case ModeMixed:
return "mixed"
default:
return fmt.Sprintf("unknown sender mode %d", uint8(*m))
}
}
func (m SenderSettleMode) Marshal(wr *buffer.Buffer) error {
return Marshal(wr, uint8(m))
}
func (m *SenderSettleMode) Unmarshal(r *buffer.Buffer) error {
n, err := ReadUbyte(r)
*m = SenderSettleMode(n)
return err
}
// Receiver Settlement Modes
const (
// Receiver will spontaneously settle all incoming transfers.
ModeFirst ReceiverSettleMode = 0
// Receiver will only settle after sending the disposition to the
// sender and receiving a disposition indicating settlement of
// the delivery from the sender.
ModeSecond ReceiverSettleMode = 1
)
// ReceiverSettleMode specifies how the receiver will settle messages.
type ReceiverSettleMode uint8
func (m *ReceiverSettleMode) String() string {
if m == nil {
return "<nil>"
}
switch *m {
case ModeFirst:
return "first"
case ModeSecond:
return "second"
default:
return fmt.Sprintf("unknown receiver mode %d", uint8(*m))
}
}
func (m ReceiverSettleMode) Marshal(wr *buffer.Buffer) error {
return Marshal(wr, uint8(m))
}
func (m *ReceiverSettleMode) Unmarshal(r *buffer.Buffer) error {
n, err := ReadUbyte(r)
*m = ReceiverSettleMode(n)
return err
}
type Role bool
const (
RoleSender Role = false
RoleReceiver Role = true
)
func (rl Role) String() string {
if rl {
return "Receiver"
}
return "Sender"
}
func (rl *Role) Unmarshal(r *buffer.Buffer) error {
b, err := readBool(r)
*rl = Role(b)
return err
}
func (rl Role) Marshal(wr *buffer.Buffer) error {
return Marshal(wr, (bool)(rl))
}
type SASLCode uint8
// SASL Codes
const (
CodeSASLOK SASLCode = iota // Connection authentication succeeded.
CodeSASLAuth // Connection authentication failed due to an unspecified problem with the supplied credentials.
CodeSASLSysPerm // Connection authentication failed due to a system error that is unlikely to be corrected without intervention.
)
func (s SASLCode) Marshal(wr *buffer.Buffer) error {
return Marshal(wr, uint8(s))
}
func (s *SASLCode) Unmarshal(r *buffer.Buffer) error {
n, err := ReadUbyte(r)
*s = SASLCode(n)
return err
}
type DeliveryState interface{} // TODO: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#type-declared
type Unsettled map[string]DeliveryState

Просмотреть файл

@ -1,4 +1,4 @@
package amqp
package frames
import (
"errors"
@ -26,7 +26,7 @@ import (
<field name="capabilities" type="symbol" multiple="true"/>
</type>
*/
type source struct {
type Source struct {
// the address of the source
//
// The address of the source MUST NOT be set when sent on a attach frame sent by
@ -49,7 +49,7 @@ type source struct {
// 0: none
// 1: configuration
// 2: unsettled-state
Durable Durability
Durable encoding.Durability
// the expiry policy of the source
//
@ -59,7 +59,7 @@ type source struct {
// connection-close: The expiry timer starts when most recently associated connection
// is closed.
// never: The terminus never expires.
ExpiryPolicy ExpiryPolicy
ExpiryPolicy encoding.ExpiryPolicy
// duration that an expiring source will be retained
//
@ -144,11 +144,11 @@ type source struct {
Capabilities encoding.MultiSymbol
}
func (s *source) Marshal(wr *buffer.Buffer) error {
func (s *Source) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSource, []encoding.MarshalField{
{Value: &s.Address, Omit: s.Address == ""},
{Value: &s.Durable, Omit: s.Durable == DurabilityNone},
{Value: &s.ExpiryPolicy, Omit: s.ExpiryPolicy == "" || s.ExpiryPolicy == ExpirySessionEnd},
{Value: &s.Durable, Omit: s.Durable == encoding.DurabilityNone},
{Value: &s.ExpiryPolicy, Omit: s.ExpiryPolicy == "" || s.ExpiryPolicy == encoding.ExpirySessionEnd},
{Value: &s.Timeout, Omit: s.Timeout == 0},
{Value: &s.Dynamic, Omit: !s.Dynamic},
{Value: s.DynamicNodeProperties, Omit: len(s.DynamicNodeProperties) == 0},
@ -160,11 +160,11 @@ func (s *source) Marshal(wr *buffer.Buffer) error {
})
}
func (s *source) Unmarshal(r *buffer.Buffer) error {
func (s *Source) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSource, []encoding.UnmarshalField{
{Field: &s.Address},
{Field: &s.Durable},
{Field: &s.ExpiryPolicy, HandleNull: func() error { s.ExpiryPolicy = ExpirySessionEnd; return nil }},
{Field: &s.ExpiryPolicy, HandleNull: func() error { s.ExpiryPolicy = encoding.ExpirySessionEnd; return nil }},
{Field: &s.Timeout},
{Field: &s.Dynamic},
{Field: &s.DynamicNodeProperties},
@ -176,7 +176,7 @@ func (s *source) Unmarshal(r *buffer.Buffer) error {
}...)
}
func (s source) String() string {
func (s Source) String() string {
return fmt.Sprintf("source{Address: %s, Durable: %d, ExpiryPolicy: %s, Timeout: %d, "+
"Dynamic: %t, DynamicNodeProperties: %v, DistributionMode: %s, Filter: %v, DefaultOutcome: %v"+
"Outcomes: %v, Capabilities: %v}",
@ -206,7 +206,7 @@ func (s source) String() string {
<field name="capabilities" type="symbol" multiple="true"/>
</type>
*/
type target struct {
type Target struct {
// the address of the target
//
// The address of the target MUST NOT be set when sent on a attach frame sent by
@ -229,7 +229,7 @@ type target struct {
// 0: none
// 1: configuration
// 2: unsettled-state
Durable Durability
Durable encoding.Durability
// the expiry policy of the target
//
@ -239,7 +239,7 @@ type target struct {
// connection-close: The expiry timer starts when most recently associated connection
// is closed.
// never: The terminus never expires.
ExpiryPolicy ExpiryPolicy
ExpiryPolicy encoding.ExpiryPolicy
// duration that an expiring target will be retained
//
@ -291,11 +291,11 @@ type target struct {
Capabilities encoding.MultiSymbol
}
func (t *target) Marshal(wr *buffer.Buffer) error {
func (t *Target) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeTarget, []encoding.MarshalField{
{Value: &t.Address, Omit: t.Address == ""},
{Value: &t.Durable, Omit: t.Durable == DurabilityNone},
{Value: &t.ExpiryPolicy, Omit: t.ExpiryPolicy == "" || t.ExpiryPolicy == ExpirySessionEnd},
{Value: &t.Durable, Omit: t.Durable == encoding.DurabilityNone},
{Value: &t.ExpiryPolicy, Omit: t.ExpiryPolicy == "" || t.ExpiryPolicy == encoding.ExpirySessionEnd},
{Value: &t.Timeout, Omit: t.Timeout == 0},
{Value: &t.Dynamic, Omit: !t.Dynamic},
{Value: t.DynamicNodeProperties, Omit: len(t.DynamicNodeProperties) == 0},
@ -303,11 +303,11 @@ func (t *target) Marshal(wr *buffer.Buffer) error {
})
}
func (t *target) Unmarshal(r *buffer.Buffer) error {
func (t *Target) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeTarget, []encoding.UnmarshalField{
{Field: &t.Address},
{Field: &t.Durable},
{Field: &t.ExpiryPolicy, HandleNull: func() error { t.ExpiryPolicy = ExpirySessionEnd; return nil }},
{Field: &t.ExpiryPolicy, HandleNull: func() error { t.ExpiryPolicy = encoding.ExpirySessionEnd; return nil }},
{Field: &t.Timeout},
{Field: &t.Dynamic},
{Field: &t.DynamicNodeProperties},
@ -315,7 +315,7 @@ func (t *target) Unmarshal(r *buffer.Buffer) error {
}...)
}
func (t target) String() string {
func (t Target) String() string {
return fmt.Sprintf("source{Address: %s, Durable: %d, ExpiryPolicy: %s, Timeout: %d, "+
"Dynamic: %t, DynamicNodeProperties: %v, Capabilities: %v}",
t.Address,
@ -329,17 +329,17 @@ func (t target) String() string {
}
// frame is the decoded representation of a frame
type frame struct {
type_ uint8 // AMQP/SASL
channel uint16 // channel this frame is for
body frameBody // body of the frame
type Frame struct {
Type uint8 // AMQP/SASL
Channel uint16 // channel this frame is for
Body FrameBody // body of the frame
// optional channel which will be closed after net transmit
done chan encoding.DeliveryState
Done chan encoding.DeliveryState
}
// frameBody adds some type safety to frame encoding
type frameBody interface {
type FrameBody interface {
frameBody()
}
@ -359,7 +359,7 @@ type frameBody interface {
</type>
*/
type performOpen struct {
type PerformOpen struct {
ContainerID string // required
Hostname string
MaxFrameSize uint32 // default: 4294967295
@ -372,9 +372,9 @@ type performOpen struct {
Properties map[encoding.Symbol]interface{}
}
func (o *performOpen) frameBody() {}
func (o *PerformOpen) frameBody() {}
func (o *performOpen) Marshal(wr *buffer.Buffer) error {
func (o *PerformOpen) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeOpen, []encoding.MarshalField{
{Value: &o.ContainerID, Omit: false},
{Value: &o.Hostname, Omit: o.Hostname == ""},
@ -389,7 +389,7 @@ func (o *performOpen) Marshal(wr *buffer.Buffer) error {
})
}
func (o *performOpen) Unmarshal(r *buffer.Buffer) error {
func (o *PerformOpen) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeOpen, []encoding.UnmarshalField{
{Field: &o.ContainerID, HandleNull: func() error { return errors.New("Open.ContainerID is required") }},
{Field: &o.Hostname},
@ -404,7 +404,7 @@ func (o *performOpen) Unmarshal(r *buffer.Buffer) error {
}...)
}
func (o *performOpen) String() string {
func (o *PerformOpen) String() string {
return fmt.Sprintf("Open{ContainerID : %s, Hostname: %s, MaxFrameSize: %d, "+
"ChannelMax: %d, IdleTimeout: %v, "+
"OutgoingLocales: %v, IncomingLocales: %v, "+
@ -436,7 +436,7 @@ func (o *performOpen) String() string {
<field name="properties" type="fields"/>
</type>
*/
type performBegin struct {
type PerformBegin struct {
// the remote channel for this session
// If a session is locally initiated, the remote-channel MUST NOT be set.
// When an endpoint responds to a remotely initiated session, the remote-channel
@ -474,9 +474,9 @@ type performBegin struct {
Properties map[encoding.Symbol]interface{}
}
func (b *performBegin) frameBody() {}
func (b *PerformBegin) frameBody() {}
func (b *performBegin) String() string {
func (b *PerformBegin) String() string {
return fmt.Sprintf("Begin{RemoteChannel: %v, NextOutgoingID: %d, IncomingWindow: %d, "+
"OutgoingWindow: %d, HandleMax: %d, OfferedCapabilities: %v, DesiredCapabilities: %v, "+
"Properties: %v}",
@ -498,7 +498,7 @@ func formatUint16Ptr(p *uint16) string {
return strconv.FormatUint(uint64(*p), 10)
}
func (b *performBegin) Marshal(wr *buffer.Buffer) error {
func (b *PerformBegin) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeBegin, []encoding.MarshalField{
{Value: b.RemoteChannel, Omit: b.RemoteChannel == nil},
{Value: &b.NextOutgoingID, Omit: false},
@ -511,7 +511,7 @@ func (b *performBegin) Marshal(wr *buffer.Buffer) error {
})
}
func (b *performBegin) Unmarshal(r *buffer.Buffer) error {
func (b *PerformBegin) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeBegin, []encoding.UnmarshalField{
{Field: &b.RemoteChannel},
{Field: &b.NextOutgoingID, HandleNull: func() error { return errors.New("Begin.NextOutgoingID is required") }},
@ -543,7 +543,7 @@ func (b *performBegin) Unmarshal(r *buffer.Buffer) error {
<field name="properties" type="fields"/>
</type>
*/
type performAttach struct {
type PerformAttach struct {
// the name of the link
//
// This name uniquely identifies the link from the container of the source
@ -572,7 +572,7 @@ type performAttach struct {
//
// The role being played by the peer, i.e., whether the peer is the sender or the
// receiver of messages on the link.
Role role
Role encoding.Role
// settlement policy for the sender
//
@ -585,7 +585,7 @@ type performAttach struct {
// 0: unsettled - The sender will send all deliveries initially unsettled to the receiver.
// 1: settled - The sender will send all deliveries settled to the receiver.
// 2: mixed - The sender MAY send a mixture of settled and unsettled deliveries to the receiver.
SenderSettleMode *SenderSettleMode
SenderSettleMode *encoding.SenderSettleMode
// the settlement policy of the receiver
//
@ -599,19 +599,19 @@ type performAttach struct {
// 1: second - The receiver will only settle after sending the disposition to
// the sender and receiving a disposition indicating settlement of
// the delivery from the sender.
ReceiverSettleMode *ReceiverSettleMode
ReceiverSettleMode *encoding.ReceiverSettleMode
// the source for messages
//
// If no source is specified on an outgoing link, then there is no source currently
// attached to the link. A link with no source will never produce outgoing messages.
Source *source
Source *Source
// the target for messages
//
// If no target is specified on an incoming link, then there is no target currently
// attached to the link. A link with no target will never permit incoming messages.
Target *target
Target *Target
// unsettled delivery state
//
@ -674,9 +674,9 @@ type performAttach struct {
Properties map[encoding.Symbol]interface{}
}
func (a *performAttach) frameBody() {}
func (a *PerformAttach) frameBody() {}
func (a performAttach) String() string {
func (a PerformAttach) String() string {
return fmt.Sprintf("Attach{Name: %s, Handle: %d, Role: %s, SenderSettleMode: %s, ReceiverSettleMode: %s, "+
"Source: %v, Target: %v, Unsettled: %v, IncompleteUnsettled: %t, InitialDeliveryCount: %d, MaxMessageSize: %d, "+
"OfferedCapabilities: %v, DesiredCapabilities: %v, Properties: %v}",
@ -697,7 +697,7 @@ func (a performAttach) String() string {
)
}
func (a *performAttach) Marshal(wr *buffer.Buffer) error {
func (a *PerformAttach) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeAttach, []encoding.MarshalField{
{Value: &a.Name, Omit: false},
{Value: &a.Handle, Omit: false},
@ -708,7 +708,7 @@ func (a *performAttach) Marshal(wr *buffer.Buffer) error {
{Value: a.Target, Omit: a.Target == nil},
{Value: a.Unsettled, Omit: len(a.Unsettled) == 0},
{Value: &a.IncompleteUnsettled, Omit: !a.IncompleteUnsettled},
{Value: &a.InitialDeliveryCount, Omit: a.Role == roleReceiver},
{Value: &a.InitialDeliveryCount, Omit: a.Role == encoding.RoleReceiver},
{Value: &a.MaxMessageSize, Omit: a.MaxMessageSize == 0},
{Value: &a.OfferedCapabilities, Omit: len(a.OfferedCapabilities) == 0},
{Value: &a.DesiredCapabilities, Omit: len(a.DesiredCapabilities) == 0},
@ -716,7 +716,7 @@ func (a *performAttach) Marshal(wr *buffer.Buffer) error {
})
}
func (a *performAttach) Unmarshal(r *buffer.Buffer) error {
func (a *PerformAttach) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeAttach, []encoding.UnmarshalField{
{Field: &a.Name, HandleNull: func() error { return errors.New("Attach.Name is required") }},
{Field: &a.Handle, HandleNull: func() error { return errors.New("Attach.Handle is required") }},
@ -751,7 +751,7 @@ func (a *performAttach) Unmarshal(r *buffer.Buffer) error {
<field name="properties" type="fields"/>
</type>
*/
type performFlow struct {
type PerformFlow struct {
// Identifies the expected transfer-id of the next incoming transfer frame.
// This value MUST be set if the peer has received the begin frame for the
// session, and MUST NOT be set if it has not. See subsection 2.5.6 for more details.
@ -850,9 +850,9 @@ type performFlow struct {
Properties map[encoding.Symbol]interface{}
}
func (f *performFlow) frameBody() {}
func (f *PerformFlow) frameBody() {}
func (f *performFlow) String() string {
func (f *PerformFlow) String() string {
return fmt.Sprintf("Flow{NextIncomingID: %s, IncomingWindow: %d, NextOutgoingID: %d, OutgoingWindow: %d, "+
"Handle: %s, DeliveryCount: %s, LinkCredit: %s, Available: %s, Drain: %t, Echo: %t, Properties: %+v}",
formatUint32Ptr(f.NextIncomingID),
@ -876,7 +876,7 @@ func formatUint32Ptr(p *uint32) string {
return strconv.FormatUint(uint64(*p), 10)
}
func (f *performFlow) Marshal(wr *buffer.Buffer) error {
func (f *PerformFlow) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeFlow, []encoding.MarshalField{
{Value: f.NextIncomingID, Omit: f.NextIncomingID == nil},
{Value: &f.IncomingWindow, Omit: false},
@ -892,7 +892,7 @@ func (f *performFlow) Marshal(wr *buffer.Buffer) error {
})
}
func (f *performFlow) Unmarshal(r *buffer.Buffer) error {
func (f *PerformFlow) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeFlow, []encoding.UnmarshalField{
{Field: &f.NextIncomingID},
{Field: &f.IncomingWindow, HandleNull: func() error { return errors.New("Flow.IncomingWindow is required") }},
@ -924,7 +924,7 @@ func (f *performFlow) Unmarshal(r *buffer.Buffer) error {
<field name="batchable" type="boolean" default="false"/>
</type>
*/
type performTransfer struct {
type PerformTransfer struct {
// Specifies the link on which the message is transferred.
Handle uint32 // required
@ -997,7 +997,7 @@ type performTransfer struct {
// 1: second - The receiver will only settle after sending the disposition to
// the sender and receiving a disposition indicating settlement of
// the delivery from the sender.
ReceiverSettleMode *ReceiverSettleMode
ReceiverSettleMode *encoding.ReceiverSettleMode
// the state of the delivery at the sender
//
@ -1065,12 +1065,12 @@ type performTransfer struct {
//
// Settled=true: closed when the transferred on network.
// Settled=false: closed when the receiver has confirmed settlement.
done chan encoding.DeliveryState
Done chan encoding.DeliveryState
}
func (t *performTransfer) frameBody() {}
func (t *PerformTransfer) frameBody() {}
func (t performTransfer) String() string {
func (t PerformTransfer) String() string {
deliveryTag := "<nil>"
if t.DeliveryTag != nil {
deliveryTag = fmt.Sprintf("%q", t.DeliveryTag)
@ -1094,7 +1094,7 @@ func (t performTransfer) String() string {
)
}
func (t *performTransfer) Marshal(wr *buffer.Buffer) error {
func (t *PerformTransfer) Marshal(wr *buffer.Buffer) error {
err := encoding.MarshalComposite(wr, encoding.TypeCodeTransfer, []encoding.MarshalField{
{Value: &t.Handle},
{Value: t.DeliveryID, Omit: t.DeliveryID == nil},
@ -1116,7 +1116,7 @@ func (t *performTransfer) Marshal(wr *buffer.Buffer) error {
return nil
}
func (t *performTransfer) Unmarshal(r *buffer.Buffer) error {
func (t *PerformTransfer) Unmarshal(r *buffer.Buffer) error {
err := encoding.UnmarshalComposite(r, encoding.TypeCodeTransfer, []encoding.UnmarshalField{
{Field: &t.Handle, HandleNull: func() error { return errors.New("Transfer.Handle is required") }},
{Field: &t.DeliveryID},
@ -1150,12 +1150,12 @@ func (t *performTransfer) Unmarshal(r *buffer.Buffer) error {
<field name="batchable" type="boolean" default="false"/>
</type>
*/
type performDisposition struct {
type PerformDisposition struct {
// directionality of disposition
//
// The role identifies whether the disposition frame contains information about
// sending link endpoints or receiving link endpoints.
Role role
Role encoding.Role
// lower bound of deliveries
//
@ -1188,9 +1188,9 @@ type performDisposition struct {
Batchable bool
}
func (d *performDisposition) frameBody() {}
func (d *PerformDisposition) frameBody() {}
func (d performDisposition) String() string {
func (d PerformDisposition) String() string {
return fmt.Sprintf("Disposition{Role: %s, First: %d, Last: %s, Settled: %t, State: %s, Batchable: %t}",
d.Role,
d.First,
@ -1201,7 +1201,7 @@ func (d performDisposition) String() string {
)
}
func (d *performDisposition) Marshal(wr *buffer.Buffer) error {
func (d *PerformDisposition) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeDisposition, []encoding.MarshalField{
{Value: &d.Role, Omit: false},
{Value: &d.First, Omit: false},
@ -1212,7 +1212,7 @@ func (d *performDisposition) Marshal(wr *buffer.Buffer) error {
})
}
func (d *performDisposition) Unmarshal(r *buffer.Buffer) error {
func (d *PerformDisposition) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeDisposition, []encoding.UnmarshalField{
{Field: &d.Role, HandleNull: func() error { return errors.New("Disposition.Role is required") }},
{Field: &d.First, HandleNull: func() error { return errors.New("Disposition.Handle is required") }},
@ -1231,7 +1231,7 @@ func (d *performDisposition) Unmarshal(r *buffer.Buffer) error {
<field name="error" type="error"/>
</type>
*/
type performDetach struct {
type PerformDetach struct {
// the local handle of the link to be detached
Handle uint32 //required
@ -1242,12 +1242,12 @@ type performDetach struct {
//
// If set, this field indicates that the link is being detached due to an error
// condition. The value of the field SHOULD contain details on the cause of the error.
Error *Error
Error *encoding.Error
}
func (d *performDetach) frameBody() {}
func (d *PerformDetach) frameBody() {}
func (d performDetach) String() string {
func (d PerformDetach) String() string {
return fmt.Sprintf("Detach{Handle: %d, Closed: %t, Error: %v}",
d.Handle,
d.Closed,
@ -1255,7 +1255,7 @@ func (d performDetach) String() string {
)
}
func (d *performDetach) Marshal(wr *buffer.Buffer) error {
func (d *PerformDetach) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeDetach, []encoding.MarshalField{
{Value: &d.Handle, Omit: false},
{Value: &d.Closed, Omit: !d.Closed},
@ -1263,7 +1263,7 @@ func (d *performDetach) Marshal(wr *buffer.Buffer) error {
})
}
func (d *performDetach) Unmarshal(r *buffer.Buffer) error {
func (d *PerformDetach) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeDetach, []encoding.UnmarshalField{
{Field: &d.Handle, HandleNull: func() error { return errors.New("Detach.Handle is required") }},
{Field: &d.Closed},
@ -1277,23 +1277,23 @@ func (d *performDetach) Unmarshal(r *buffer.Buffer) error {
<field name="error" type="error"/>
</type>
*/
type performEnd struct {
type PerformEnd struct {
// error causing the end
//
// If set, this field indicates that the session is being ended due to an error
// condition. The value of the field SHOULD contain details on the cause of the error.
Error *Error
Error *encoding.Error
}
func (e *performEnd) frameBody() {}
func (e *PerformEnd) frameBody() {}
func (e *performEnd) Marshal(wr *buffer.Buffer) error {
func (e *PerformEnd) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeEnd, []encoding.MarshalField{
{Value: e.Error, Omit: e.Error == nil},
})
}
func (e *performEnd) Unmarshal(r *buffer.Buffer) error {
func (e *PerformEnd) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeEnd,
encoding.UnmarshalField{Field: &e.Error},
)
@ -1305,29 +1305,29 @@ func (e *performEnd) Unmarshal(r *buffer.Buffer) error {
<field name="error" type="error"/>
</type>
*/
type performClose struct {
type PerformClose struct {
// error causing the close
//
// If set, this field indicates that the session is being closed due to an error
// condition. The value of the field SHOULD contain details on the cause of the error.
Error *Error
Error *encoding.Error
}
func (c *performClose) frameBody() {}
func (c *PerformClose) frameBody() {}
func (c *performClose) Marshal(wr *buffer.Buffer) error {
func (c *PerformClose) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeClose, []encoding.MarshalField{
{Value: c.Error, Omit: c.Error == nil},
})
}
func (c *performClose) Unmarshal(r *buffer.Buffer) error {
func (c *PerformClose) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeClose,
encoding.UnmarshalField{Field: &c.Error},
)
}
func (c *performClose) String() string {
func (c *PerformClose) String() string {
return fmt.Sprintf("Close{Error: %s}", c.Error)
}
@ -1340,15 +1340,15 @@ func (c *performClose) String() string {
</type>
*/
type saslInit struct {
type SASLInit struct {
Mechanism encoding.Symbol
InitialResponse []byte
Hostname string
}
func (si *saslInit) frameBody() {}
func (si *SASLInit) frameBody() {}
func (si *saslInit) Marshal(wr *buffer.Buffer) error {
func (si *SASLInit) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLInit, []encoding.MarshalField{
{Value: &si.Mechanism, Omit: false},
{Value: &si.InitialResponse, Omit: len(si.InitialResponse) == 0},
@ -1356,7 +1356,7 @@ func (si *saslInit) Marshal(wr *buffer.Buffer) error {
})
}
func (si *saslInit) Unmarshal(r *buffer.Buffer) error {
func (si *SASLInit) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLInit, []encoding.UnmarshalField{
{Field: &si.Mechanism, HandleNull: func() error { return errors.New("saslInit.Mechanism is required") }},
{Field: &si.InitialResponse},
@ -1364,7 +1364,7 @@ func (si *saslInit) Unmarshal(r *buffer.Buffer) error {
}...)
}
func (si *saslInit) String() string {
func (si *SASLInit) String() string {
// Elide the InitialResponse as it may contain a plain text secret.
return fmt.Sprintf("SaslInit{Mechanism : %s, InitialResponse: ********, Hostname: %s}",
si.Mechanism,
@ -1379,25 +1379,25 @@ func (si *saslInit) String() string {
</type>
*/
type saslMechanisms struct {
type SASLMechanisms struct {
Mechanisms encoding.MultiSymbol
}
func (sm *saslMechanisms) frameBody() {}
func (sm *SASLMechanisms) frameBody() {}
func (sm *saslMechanisms) Marshal(wr *buffer.Buffer) error {
func (sm *SASLMechanisms) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLMechanism, []encoding.MarshalField{
{Value: &sm.Mechanisms, Omit: false},
})
}
func (sm *saslMechanisms) Unmarshal(r *buffer.Buffer) error {
func (sm *SASLMechanisms) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLMechanism,
encoding.UnmarshalField{Field: &sm.Mechanisms, HandleNull: func() error { return errors.New("saslMechanisms.Mechanisms is required") }},
)
}
func (sm *saslMechanisms) String() string {
func (sm *SASLMechanisms) String() string {
return fmt.Sprintf("SaslMechanisms{Mechanisms : %v}",
sm.Mechanisms,
)
@ -1410,23 +1410,23 @@ func (sm *saslMechanisms) String() string {
</type>
*/
type saslChallenge struct {
type SASLChallenge struct {
Challenge []byte
}
func (sc *saslChallenge) String() string {
func (sc *SASLChallenge) String() string {
return "Challenge{Challenge: ********}"
}
func (sc *saslChallenge) frameBody() {}
func (sc *SASLChallenge) frameBody() {}
func (sc *saslChallenge) Marshal(wr *buffer.Buffer) error {
func (sc *SASLChallenge) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLChallenge, []encoding.MarshalField{
{Value: &sc.Challenge, Omit: false},
})
}
func (sc *saslChallenge) Unmarshal(r *buffer.Buffer) error {
func (sc *SASLChallenge) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLChallenge, []encoding.UnmarshalField{
{Field: &sc.Challenge, HandleNull: func() error { return errors.New("saslChallenge.Challenge is required") }},
}...)
@ -1439,23 +1439,23 @@ func (sc *saslChallenge) Unmarshal(r *buffer.Buffer) error {
</type>
*/
type saslResponse struct {
type SASLResponse struct {
Response []byte
}
func (sr *saslResponse) String() string {
func (sr *SASLResponse) String() string {
return "Response{Response: ********}"
}
func (sr *saslResponse) frameBody() {}
func (sr *SASLResponse) frameBody() {}
func (sr *saslResponse) Marshal(wr *buffer.Buffer) error {
func (sr *SASLResponse) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLResponse, []encoding.MarshalField{
{Value: &sr.Response, Omit: false},
})
}
func (sr *saslResponse) Unmarshal(r *buffer.Buffer) error {
func (sr *SASLResponse) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLResponse, []encoding.UnmarshalField{
{Field: &sr.Response, HandleNull: func() error { return errors.New("saslResponse.Response is required") }},
}...)
@ -1469,28 +1469,28 @@ func (sr *saslResponse) Unmarshal(r *buffer.Buffer) error {
</type>
*/
type saslOutcome struct {
Code saslCode
type SASLOutcome struct {
Code encoding.SASLCode
AdditionalData []byte
}
func (so *saslOutcome) frameBody() {}
func (so *SASLOutcome) frameBody() {}
func (so *saslOutcome) Marshal(wr *buffer.Buffer) error {
func (so *SASLOutcome) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLOutcome, []encoding.MarshalField{
{Value: &so.Code, Omit: false},
{Value: &so.AdditionalData, Omit: len(so.AdditionalData) == 0},
})
}
func (so *saslOutcome) Unmarshal(r *buffer.Buffer) error {
func (so *SASLOutcome) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLOutcome, []encoding.UnmarshalField{
{Field: &so.Code, HandleNull: func() error { return errors.New("saslOutcome.AdditionalData is required") }},
{Field: &so.AdditionalData},
}...)
}
func (so *saslOutcome) String() string {
func (so *SASLOutcome) String() string {
return fmt.Sprintf("SaslOutcome{Code : %v, AdditionalData: %v}",
so.Code,
so.AdditionalData,

103
link.go
Просмотреть файл

@ -10,32 +10,9 @@ import (
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
type role bool
const (
roleSender role = false
roleReceiver role = true
)
func (rl role) String() string {
if rl {
return "Receiver"
}
return "Sender"
}
func (rl *role) Unmarshal(r *buffer.Buffer) error {
b, err := encoding.ReadBool(r)
*rl = role(b)
return err
}
func (rl role) Marshal(wr *buffer.Buffer) error {
return encoding.Marshal(wr, (bool)(rl))
}
// link is a unidirectional route.
//
// May be used for sending or receiving.
@ -44,8 +21,8 @@ type link struct {
handle uint32 // our handle
remoteHandle uint32 // remote's handle
dynamicAddr bool // request a dynamic link address from the server
rx chan frameBody // sessions sends frames for this link on this channel
transfers chan performTransfer // sender uses to send transfer frames
rx chan frames.FrameBody // sessions sends frames for this link on this channel
transfers chan frames.PerformTransfer // sender uses to send transfer frames
closeOnce sync.Once // closeOnce protects close from being closed multiple times
// NOTE: `close` and `detached` BOTH need to be checked to determine if the link
@ -61,8 +38,8 @@ type link struct {
detachError *Error // error to send to remote on detach, set by closeWithError
session *Session // parent session
receiver *Receiver // allows link options to modify Receiver
source *source
target *target
source *frames.Source
target *frames.Target
properties map[encoding.Symbol]interface{} // additional properties sent upon link attach
// Indicates whether we should allow detaches on disposition errors or not.
// Some AMQP servers (like Event Hubs) benefit from keeping the link open on disposition errors
@ -97,7 +74,7 @@ type link struct {
func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
l := &link{
key: linkKey{randString(40), role(r != nil)},
key: linkKey{randString(40), encoding.Role(r != nil)},
session: s,
receiver: r,
close: make(chan struct{}),
@ -130,12 +107,12 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
// attempting to send to a slow reader
if isReceiver {
if l.receiver.manualCreditor != nil {
l.rx = make(chan frameBody, l.receiver.maxCredit)
l.rx = make(chan frames.FrameBody, l.receiver.maxCredit)
} else {
l.rx = make(chan frameBody, l.linkCredit)
l.rx = make(chan frames.FrameBody, l.linkCredit)
}
} else {
l.rx = make(chan frameBody, 1)
l.rx = make(chan frames.FrameBody, 1)
}
// request handle from Session.mux
@ -157,7 +134,7 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
return nil, l.err
}
attach := &performAttach{
attach := &frames.PerformAttach{
Name: l.key.name,
Handle: l.handle,
ReceiverSettleMode: l.receiverSettleMode,
@ -169,15 +146,15 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
}
if isReceiver {
attach.Role = roleReceiver
attach.Role = encoding.RoleReceiver
if attach.Source == nil {
attach.Source = new(source)
attach.Source = new(frames.Source)
}
attach.Source.Dynamic = l.dynamicAddr
} else {
attach.Role = roleSender
attach.Role = encoding.RoleSender
if attach.Target == nil {
attach.Target = new(target)
attach.Target = new(frames.Target)
}
attach.Target.Dynamic = l.dynamicAddr
}
@ -187,14 +164,14 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
_ = s.txFrame(attach, nil)
// wait for response
var fr frameBody
var fr frames.FrameBody
select {
case <-s.done:
return nil, s.err
case fr = <-l.rx:
}
debug(3, "RX: %s", fr)
resp, ok := fr.(*performAttach)
resp, ok := fr.(*frames.PerformAttach)
if !ok {
return nil, fmt.Errorf("unexpected attach response: %#v", fr)
}
@ -216,13 +193,13 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
case fr = <-l.rx:
}
detach, ok := fr.(*performDetach)
detach, ok := fr.(*frames.PerformDetach)
if !ok {
return nil, fmt.Errorf("unexpected frame while waiting for detach: %#v", fr)
}
// send return detach
fr = &performDetach{
fr = &frames.PerformDetach{
Handle: l.handle,
Closed: true,
}
@ -256,7 +233,7 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
if l.dynamicAddr && resp.Target != nil {
l.target.Address = resp.Target.Address
}
l.transfers = make(chan performTransfer)
l.transfers = make(chan frames.PerformTransfer)
}
err = l.setSettleModes(resp)
@ -289,14 +266,14 @@ func (l *link) countUnsettled() int {
return count
}
// setSettleModes sets the settlement modes based on the resp performAttach.
// setSettleModes sets the settlement modes based on the resp frames.PerformAttach.
//
// If a settlement mode has been explicitly set locally and it was not honored by the
// server an error is returned.
func (l *link) setSettleModes(resp *performAttach) error {
func (l *link) setSettleModes(resp *frames.PerformAttach) error {
var (
localRecvSettle = l.receiverSettleMode.value()
respRecvSettle = resp.ReceiverSettleMode.value()
localRecvSettle = receiverSettleModeValue(l.receiverSettleMode)
respRecvSettle = receiverSettleModeValue(resp.ReceiverSettleMode)
)
if l.receiverSettleMode != nil && localRecvSettle != respRecvSettle {
return fmt.Errorf("amqp: receiver settlement mode %q requested, received %q from server", l.receiverSettleMode, &respRecvSettle)
@ -304,8 +281,8 @@ func (l *link) setSettleModes(resp *performAttach) error {
l.receiverSettleMode = &respRecvSettle
var (
localSendSettle = l.senderSettleMode.value()
respSendSettle = resp.SenderSettleMode.value()
localSendSettle = senderSettleModeValue(l.senderSettleMode)
respSendSettle = senderSettleModeValue(resp.SenderSettleMode)
)
if l.senderSettleMode != nil && localSendSettle != respSendSettle {
return fmt.Errorf("amqp: sender settlement mode %q requested, received %q from server", l.senderSettleMode, &respSendSettle)
@ -369,7 +346,7 @@ func (l *link) mux() {
Loop:
for {
var outgoingTransfers chan performTransfer
var outgoingTransfers chan frames.PerformTransfer
ok, enableOutgoingTransfers := l.doFlow()
@ -440,7 +417,7 @@ func (l *link) muxFlow(linkCredit uint32, drain bool) error {
debug(3, "link.muxFlow(): len(l.messages):%d - linkCredit: %d - deliveryCount: %d, inFlight: %d", len(l.messages), linkCredit, deliveryCount, len(l.receiver.inFlight.m))
fr := &performFlow{
fr := &frames.PerformFlow{
Handle: &l.handle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages,
@ -472,7 +449,7 @@ func (l *link) muxFlow(linkCredit uint32, drain bool) error {
}
}
func (l *link) muxReceive(fr performTransfer) error {
func (l *link) muxReceive(fr frames.PerformTransfer) error {
if !l.more {
// this is the first transfer of a message,
// record the delivery ID, message format,
@ -592,7 +569,7 @@ func (l *link) muxReceive(fr performTransfer) error {
debug(1, "deliveryID %d before push to receiver - deliveryCount : %d - linkCredit: %d, len(messages): %d, len(inflight): %d", l.msg.deliveryID, l.deliveryCount, l.linkCredit, len(l.messages), len(l.receiver.inFlight.m))
// send to receiver, this should never block due to buffering
// and flow control.
if l.receiverSettleMode.value() == ModeSecond {
if receiverSettleModeValue(l.receiverSettleMode) == ModeSecond {
l.addUnsettled(&l.msg)
}
l.messages <- l.msg
@ -645,7 +622,7 @@ func (l *link) issueCredit(credit uint32) error {
}
// muxHandleFrame processes fr based on type.
func (l *link) muxHandleFrame(fr frameBody) error {
func (l *link) muxHandleFrame(fr frames.FrameBody) error {
var (
isSender = l.receiver == nil
errOnRejectDisposition = l.detachOnDispositionError && (isSender && (l.receiverSettleMode == nil || *l.receiverSettleMode == ModeFirst))
@ -653,7 +630,7 @@ func (l *link) muxHandleFrame(fr frameBody) error {
switch fr := fr.(type) {
// message frame
case *performTransfer:
case *frames.PerformTransfer:
debug(3, "RX: %s", fr)
if isSender {
// Senders should never receive transfer frames, but handle it just in case.
@ -667,7 +644,7 @@ func (l *link) muxHandleFrame(fr frameBody) error {
return l.muxReceive(*fr)
// flow control frame
case *performFlow:
case *frames.PerformFlow:
debug(3, "RX: %s", fr)
if isSender {
linkCredit := *fr.LinkCredit - l.deliveryCount
@ -697,7 +674,7 @@ func (l *link) muxHandleFrame(fr frameBody) error {
)
// send flow
resp := &performFlow{
resp := &frames.PerformFlow{
Handle: &l.handle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
@ -706,7 +683,7 @@ func (l *link) muxHandleFrame(fr frameBody) error {
_ = l.session.txFrame(resp, nil)
// remote side is closing links
case *performDetach:
case *frames.PerformDetach:
debug(1, "RX: %s", fr)
// don't currently support link detach and reattach
if !fr.Closed {
@ -718,7 +695,7 @@ func (l *link) muxHandleFrame(fr frameBody) error {
return fmt.Errorf("received detach frame %v", &DetachError{fr.Error})
case *performDisposition:
case *frames.PerformDisposition:
debug(3, "RX: %s", fr)
// Unblock receivers waiting for message disposition
@ -742,8 +719,8 @@ func (l *link) muxHandleFrame(fr frameBody) error {
return nil
}
resp := &performDisposition{
Role: roleSender,
resp := &frames.PerformDisposition{
Role: encoding.RoleSender,
First: fr.First,
Last: fr.Last,
Settled: true,
@ -838,7 +815,7 @@ func (l *link) muxDetach() {
detachError := l.detachError
l.detachErrorMu.Unlock()
fr := &performDetach{
fr := &frames.PerformDetach{
Handle: l.handle,
Closed: true,
Error: detachError,
@ -852,7 +829,7 @@ Loop:
break Loop
case fr := <-l.rx:
// discard incoming frames to avoid blocking session.mux
if fr, ok := fr.(*performDetach); ok && fr.Closed {
if fr, ok := fr.(*frames.PerformDetach); ok && fr.Closed {
l.detachReceived = true
}
case <-l.session.done:
@ -874,7 +851,7 @@ Loop:
// read from link until detach with Close == true is received,
// other frames are discarded.
case fr := <-l.rx:
if fr, ok := fr.(*performDetach); ok && fr.Closed {
if fr, ok := fr.(*frames.PerformDetach); ok && fr.Closed {
return
}

Просмотреть файл

@ -6,6 +6,7 @@ import (
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// LinkOption is a function for configuring an AMQP link.
@ -78,7 +79,7 @@ func LinkName(name string) LinkOption {
func LinkSourceCapabilities(capabilities ...string) LinkOption {
return func(l *link) error {
if l.source == nil {
l.source = new(source)
l.source = new(frames.Source)
}
// Convert string to symbol
@ -96,7 +97,7 @@ func LinkSourceCapabilities(capabilities ...string) LinkOption {
func LinkSourceAddress(addr string) LinkOption {
return func(l *link) error {
if l.source == nil {
l.source = new(source)
l.source = new(frames.Source)
}
l.source.Address = addr
return nil
@ -107,7 +108,7 @@ func LinkSourceAddress(addr string) LinkOption {
func LinkTargetAddress(addr string) LinkOption {
return func(l *link) error {
if l.target == nil {
l.target = new(target)
l.target = new(frames.Target)
}
l.target.Address = addr
return nil
@ -235,7 +236,7 @@ func LinkSelectorFilter(filter string) LinkOption {
func LinkSourceFilter(name string, code uint64, value interface{}) LinkOption {
return func(l *link) error {
if l.source == nil {
l.source = new(source)
l.source = new(frames.Source)
}
if l.source.Filter == nil {
l.source.Filter = make(map[encoding.Symbol]*encoding.DescribedType)
@ -279,7 +280,7 @@ func LinkTargetDurability(d Durability) LinkOption {
}
if l.target == nil {
l.target = new(target)
l.target = new(frames.Target)
}
l.target.Durable = d
@ -292,13 +293,13 @@ func LinkTargetDurability(d Durability) LinkOption {
// Default: ExpirySessionEnd.
func LinkTargetExpiryPolicy(p ExpiryPolicy) LinkOption {
return func(l *link) error {
err := p.validate()
err := encoding.ValidateExpiryPolicy(p)
if err != nil {
return err
}
if l.target == nil {
l.target = new(target)
l.target = new(frames.Target)
}
l.target.ExpiryPolicy = p
@ -312,7 +313,7 @@ func LinkTargetExpiryPolicy(p ExpiryPolicy) LinkOption {
func LinkTargetTimeout(timeout uint32) LinkOption {
return func(l *link) error {
if l.target == nil {
l.target = new(target)
l.target = new(frames.Target)
}
l.target.Timeout = timeout
@ -330,7 +331,7 @@ func LinkSourceDurability(d Durability) LinkOption {
}
if l.source == nil {
l.source = new(source)
l.source = new(frames.Source)
}
l.source.Durable = d
@ -343,13 +344,13 @@ func LinkSourceDurability(d Durability) LinkOption {
// Default: ExpirySessionEnd.
func LinkSourceExpiryPolicy(p ExpiryPolicy) LinkOption {
return func(l *link) error {
err := p.validate()
err := encoding.ValidateExpiryPolicy(p)
if err != nil {
return err
}
if l.source == nil {
l.source = new(source)
l.source = new(frames.Source)
}
l.source.ExpiryPolicy = p
@ -363,7 +364,7 @@ func LinkSourceExpiryPolicy(p ExpiryPolicy) LinkOption {
func LinkSourceTimeout(timeout uint32) LinkOption {
return func(l *link) error {
if l.source == nil {
l.source = new(source)
l.source = new(frames.Source)
}
l.source.Timeout = timeout

Просмотреть файл

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/Azure/go-amqp/internal/frames"
"github.com/stretchr/testify/require"
)
@ -64,7 +65,7 @@ func TestLinkFlowThatNeedsToReplenishCredits(t *testing.T) {
txFrame := <-l.session.tx
switch frame := txFrame.(type) {
case *performFlow:
case *frames.PerformFlow:
require.False(t, frame.Drain)
// replenished credits: l.receiver.maxCredit-uint32(l.countUnsettled())
require.EqualValues(t, 2, *frame.LinkCredit)
@ -132,7 +133,7 @@ func TestLinkFlowWithManualCreditor(t *testing.T) {
txFrame := <-l.session.tx
switch frame := txFrame.(type) {
case *performFlow:
case *frames.PerformFlow:
require.False(t, frame.Drain)
require.EqualValues(t, 100+1, *frame.LinkCredit)
default:
@ -155,7 +156,7 @@ func TestLinkFlowWithDrain(t *testing.T) {
txFrame := <-l.session.tx
switch frame := txFrame.(type) {
case *performFlow:
case *frames.PerformFlow:
require.True(t, frame.Drain)
require.EqualValues(t, 1, *frame.LinkCredit)
default:
@ -163,7 +164,7 @@ func TestLinkFlowWithDrain(t *testing.T) {
}
// simulate the return of the flow from the service
err := l.muxHandleFrame(&performFlow{
err := l.muxHandleFrame(&frames.PerformFlow{
Drain: true,
})
@ -195,17 +196,17 @@ func TestLinkFlowWithManualCreditorAndNoFlowNeeded(t *testing.T) {
func newTestLink(t *testing.T) *link {
l := &link{
source: &source{},
source: &frames.Source{},
receiver: &Receiver{
// adding just enough so the debug() print will still work...
// debug(1, "FLOW Link Mux half: source: %s, inflight: %d, credit: %d, deliveryCount: %d, messages: %d, unsettled: %d, maxCredit : %d, settleMode: %s", l.source.Address, len(l.receiver.inFlight.m), l.linkCredit, l.deliveryCount, len(l.messages), l.countUnsettled(), l.receiver.maxCredit, l.receiverSettleMode.String())
inFlight: inFlight{},
},
session: &Session{
tx: make(chan frameBody, 100),
tx: make(chan frames.FrameBody, 100),
done: make(chan struct{}),
},
rx: make(chan frameBody, 100),
rx: make(chan frames.FrameBody, 100),
receiverReady: make(chan struct{}, 1),
}

Просмотреть файл

@ -14,18 +14,19 @@ import (
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
var exampleFrames = []struct {
label string
frame frame
frame frames.Frame
}{
{
label: "transfer",
frame: frame{
type_: frameTypeAMQP,
channel: 10,
body: &performTransfer{
frame: frames.Frame{
Type: frameTypeAMQP,
Channel: 10,
Body: &frames.PerformTransfer{
Handle: 34983,
DeliveryID: uint32Ptr(564),
DeliveryTag: []byte("foo tag"),
@ -59,19 +60,19 @@ func TestFrameMarshalUnmarshal(t *testing.T) {
}
want := tt.frame
if header.Channel != want.channel {
t.Errorf("Expected channel to be %d, but it is %d", want.channel, header.Channel)
if header.Channel != want.Channel {
t.Errorf("Expected channel to be %d, but it is %d", want.Channel, header.Channel)
}
if header.FrameType != want.type_ {
t.Errorf("Expected channel to be %d, but it is %d", want.type_, header.FrameType)
if header.FrameType != want.Type {
t.Errorf("Expected channel to be %d, but it is %d", want.Type, header.FrameType)
}
payload, err := parseFrameBody(&buf)
if err != nil {
t.Fatalf("%+v", err)
}
if !testEqual(want.body, payload) {
t.Errorf("Roundtrip produced different results:\n %s", testDiff(want.body, payload))
if !testEqual(want.Body, payload) {
t.Errorf("Roundtrip produced different results:\n %s", testDiff(want.Body, payload))
}
})
}
@ -263,7 +264,7 @@ var (
remoteChannel = uint16(4321)
protoTypes = []interface{}{
&performOpen{
&frames.PerformOpen{
ContainerID: "foo",
Hostname: "bar.host",
MaxFrameSize: 4200,
@ -276,7 +277,7 @@ var (
"fooProp": int32(45),
},
},
&performBegin{
&frames.PerformBegin{
RemoteChannel: &remoteChannel,
NextOutgoingID: 730000,
IncomingWindow: 9876654,
@ -288,13 +289,13 @@ var (
"fooProp": int32(45),
},
},
&performAttach{
&frames.PerformAttach{
Name: "fooName",
Handle: 435982,
Role: roleSender,
Role: encoding.RoleSender,
SenderSettleMode: sndSettle(ModeMixed),
ReceiverSettleMode: rcvSettle(ModeSecond),
Source: &source{
Source: &frames.Source{
Address: "fooAddr",
Durable: DurabilityUnsettledState,
ExpiryPolicy: ExpiryLinkDetach,
@ -313,7 +314,7 @@ var (
Outcomes: []encoding.Symbol{"amqp:accepted:list"},
Capabilities: []encoding.Symbol{"barCap"},
},
Target: &target{
Target: &frames.Target{
Address: "fooAddr",
Durable: DurabilityUnsettledState,
ExpiryPolicy: ExpiryLinkDetach,
@ -336,11 +337,11 @@ var (
"fooProp": int32(45),
},
},
role(true),
encoding.Role(true),
&encoding.Unsettled{
"fooDeliveryTag": &encoding.StateAccepted{},
},
&source{
&frames.Source{
Address: "fooAddr",
Durable: DurabilityUnsettledState,
ExpiryPolicy: ExpiryLinkDetach,
@ -359,7 +360,7 @@ var (
Outcomes: []encoding.Symbol{"amqp:accepted:list"},
Capabilities: []encoding.Symbol{"barCap"},
},
&target{
&frames.Target{
Address: "fooAddr",
Durable: DurabilityUnsettledState,
ExpiryPolicy: ExpiryLinkDetach,
@ -370,7 +371,7 @@ var (
},
Capabilities: []encoding.Symbol{"barCap"},
},
&performFlow{
&frames.PerformFlow{
NextIncomingID: uint32Ptr(354),
IncomingWindow: 4352,
NextOutgoingID: 85324,
@ -385,7 +386,7 @@ var (
"fooProp": int32(45),
},
},
&performTransfer{
&frames.PerformTransfer{
Handle: 34983,
DeliveryID: uint32Ptr(564),
DeliveryTag: []byte("foo tag"),
@ -399,15 +400,15 @@ var (
Batchable: true,
Payload: []byte("very important payload"),
},
&performDisposition{
Role: roleSender,
&frames.PerformDisposition{
Role: encoding.RoleSender,
First: 5644444,
Last: uint32Ptr(423),
Settled: true,
State: &encoding.StateReleased{},
Batchable: true,
},
&performDetach{
&frames.PerformDetach{
Handle: 4352,
Closed: true,
Error: &Error{
@ -419,7 +420,7 @@ var (
},
},
},
&performDetach{
&frames.PerformDetach{
Handle: 4352,
Closed: true,
Error: &Error{
@ -443,7 +444,7 @@ var (
"and": uint16(875),
},
},
&performEnd{
&frames.PerformEnd{
Error: &Error{
Condition: ErrorNotAllowed,
Description: "foo description",
@ -453,7 +454,7 @@ var (
},
},
},
&performClose{
&frames.PerformClose{
Error: &Error{
Condition: ErrorNotAllowed,
Description: "foo description",
@ -552,22 +553,22 @@ var (
encoding.LifetimePolicy(encoding.TypeCodeDeleteOnClose),
SenderSettleMode(1),
ReceiverSettleMode(1),
&saslInit{
&frames.SASLInit{
Mechanism: "FOO",
InitialResponse: []byte("BAR\x00RESPONSE\x00"),
Hostname: "me",
},
&saslMechanisms{
&frames.SASLMechanisms{
Mechanisms: []encoding.Symbol{"FOO", "BAR", "BAZ"},
},
&saslChallenge{
&frames.SASLChallenge{
Challenge: []byte("BAR\x00CHALLENGE\x00"),
},
&saslResponse{
&frames.SASLResponse{
Response: []byte("BAR\x00RESPONSE\x00"),
},
&saslOutcome{
Code: codeSASLSysPerm,
&frames.SASLOutcome{
Code: encoding.CodeSASLSysPerm,
AdditionalData: []byte("here's some info for you..."),
},
encoding.Milliseconds(10 * time.Second),

Просмотреть файл

@ -7,6 +7,7 @@ import (
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
type messageDisposition struct {
@ -53,7 +54,7 @@ func (r *Receiver) HandleMessage(ctx context.Context, handle func(*Message) erro
msg.receiver = r
// we only need to track message disposition for mode second
// spec : http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-receiver-settle-mode
if r.link.receiverSettleMode.value() == ModeSecond {
if receiverSettleModeValue(r.link.receiverSettleMode) == ModeSecond {
go trackCompletion(msg)
}
// tracks messages until exiting handler
@ -255,8 +256,8 @@ func (r *Receiver) dispositionBatcher() {
// sendDisposition sends a disposition frame to the peer
func (r *Receiver) sendDisposition(first uint32, last *uint32, state interface{}) error {
fr := &performDisposition{
Role: roleReceiver,
fr := &frames.PerformDisposition{
Role: encoding.RoleReceiver,
First: first,
Last: last,
Settled: r.link.receiverSettleMode == nil || *r.link.receiverSettleMode == ModeFirst,

59
sasl.go
Просмотреть файл

@ -3,15 +3,8 @@ package amqp
import (
"fmt"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
)
// SASL Codes
const (
codeSASLOK saslCode = iota // Connection authentication succeeded.
codeSASLAuth // Connection authentication failed due to an unspecified problem with the supplied credentials.
codeSASLSysPerm // Connection authentication failed due to a system error that is unlikely to be corrected without intervention.
"github.com/Azure/go-amqp/internal/frames"
)
// SASL Mechanisms
@ -26,18 +19,6 @@ const (
frameTypeSASL = 0x1
)
type saslCode uint8
func (s saslCode) Marshal(wr *buffer.Buffer) error {
return encoding.Marshal(wr, uint8(s))
}
func (s *saslCode) Unmarshal(r *buffer.Buffer) error {
n, err := encoding.ReadUbyte(r)
*s = saslCode(n)
return err
}
// ConnSASLPlain enables SASL PLAIN authentication for the connection.
//
// SASL PLAIN transmits credentials in plain text and should only be used
@ -53,15 +34,15 @@ func ConnSASLPlain(username, password string) ConnOption {
// add the handler the the map
c.saslHandlers[saslMechanismPLAIN] = func() stateFunc {
// send saslInit with PLAIN payload
init := &saslInit{
init := &frames.SASLInit{
Mechanism: "PLAIN",
InitialResponse: []byte("\x00" + username + "\x00" + password),
Hostname: "",
}
debug(1, "TX: %s", init)
c.err = c.writeFrame(frame{
type_: frameTypeSASL,
body: init,
c.err = c.writeFrame(frames.Frame{
Type: frameTypeSASL,
Body: init,
})
if c.err != nil {
return nil
@ -84,14 +65,14 @@ func ConnSASLAnonymous() ConnOption {
// add the handler the the map
c.saslHandlers[saslMechanismANONYMOUS] = func() stateFunc {
init := &saslInit{
init := &frames.SASLInit{
Mechanism: saslMechanismANONYMOUS,
InitialResponse: []byte("anonymous"),
}
debug(1, "TX: %s", init)
c.err = c.writeFrame(frame{
type_: frameTypeSASL,
body: init,
c.err = c.writeFrame(frames.Frame{
Type: frameTypeSASL,
Body: init,
})
if c.err != nil {
return nil
@ -149,9 +130,9 @@ func (s saslXOAUTH2Handler) init() stateFunc {
if s.maxFrameSizeOverride > s.conn.peerMaxFrameSize {
s.conn.peerMaxFrameSize = s.maxFrameSizeOverride
}
s.conn.err = s.conn.writeFrame(frame{
type_: frameTypeSASL,
body: &saslInit{
s.conn.err = s.conn.writeFrame(frames.Frame{
Type: frameTypeSASL,
Body: &frames.SASLInit{
Mechanism: saslMechanismXOAUTH2,
InitialResponse: s.response,
},
@ -172,10 +153,10 @@ func (s saslXOAUTH2Handler) step() stateFunc {
return nil
}
switch v := fr.body.(type) {
case *saslOutcome:
switch v := fr.Body.(type) {
case *frames.SASLOutcome:
// check if auth succeeded
if v.Code != codeSASLOK {
if v.Code != encoding.CodeSASLOK {
s.conn.err = fmt.Errorf("SASL XOAUTH2 auth failed with code %#00x: %s : %s",
v.Code, v.AdditionalData, s.errorResponse)
return nil
@ -184,14 +165,14 @@ func (s saslXOAUTH2Handler) step() stateFunc {
// return to c.negotiateProto
s.conn.saslComplete = true
return s.conn.negotiateProto
case *saslChallenge:
case *frames.SASLChallenge:
if s.errorResponse == nil {
s.errorResponse = v.Challenge
// The SASL protocol requires clients to send an empty response to this challenge.
s.conn.err = s.conn.writeFrame(frame{
type_: frameTypeSASL,
body: &saslResponse{
s.conn.err = s.conn.writeFrame(frames.Frame{
Type: frameTypeSASL,
Body: &frames.SASLResponse{
Response: []byte{},
},
})
@ -202,7 +183,7 @@ func (s saslXOAUTH2Handler) step() stateFunc {
return nil
}
default:
s.conn.err = fmt.Errorf("unexpected frame type %T", fr.body)
s.conn.err = fmt.Errorf("unexpected frame type %T", fr.Body)
return nil
}
}

Просмотреть файл

@ -10,6 +10,7 @@ import (
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/testconn"
)
@ -79,21 +80,21 @@ func TestSaslXOAUTH2EmptyUsername(t *testing.T) {
func TestConnSASLXOAUTH2AuthSuccess(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
},
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslOutcome{Code: codeSASLOK},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLOutcome{Code: encoding.CodeSASLOK},
},
[]byte("AMQP\x00\x01\x00\x00"),
frame{
type_: frameTypeAMQP,
channel: 0,
body: &performOpen{},
frames.Frame{
Type: frameTypeAMQP,
Channel: 0,
Body: &frames.PerformOpen{},
},
)
@ -114,15 +115,15 @@ func TestConnSASLXOAUTH2AuthSuccess(t *testing.T) {
func TestConnSASLXOAUTH2AuthFail(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
},
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslOutcome{Code: codeSASLAuth},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLOutcome{Code: encoding.CodeSASLAuth},
},
)
@ -140,7 +141,7 @@ func TestConnSASLXOAUTH2AuthFail(t *testing.T) {
switch {
case err == nil:
t.Errorf("authentication is expected to fail ")
case !strings.Contains(err.Error(), fmt.Sprintf("code %#00x", codeSASLAuth)):
case !strings.Contains(err.Error(), fmt.Sprintf("code %#00x", encoding.CodeSASLAuth)):
t.Errorf("unexpected connection failure : %s", err)
}
}
@ -148,20 +149,20 @@ func TestConnSASLXOAUTH2AuthFail(t *testing.T) {
func TestConnSASLXOAUTH2AuthFailWithErrorResponse(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
},
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslChallenge{Challenge: []byte("{ \"status\":\"401\", \"schemes\":\"bearer\", \"scope\":\"https://mail.google.com/\" }")},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLChallenge{Challenge: []byte("{ \"status\":\"401\", \"schemes\":\"bearer\", \"scope\":\"https://mail.google.com/\" }")},
},
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslOutcome{Code: codeSASLAuth},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLOutcome{Code: encoding.CodeSASLAuth},
},
)
@ -179,7 +180,7 @@ func TestConnSASLXOAUTH2AuthFailWithErrorResponse(t *testing.T) {
switch {
case err == nil:
t.Errorf("authentication is expected to fail ")
case !strings.Contains(err.Error(), fmt.Sprintf("code %#00x", codeSASLAuth)):
case !strings.Contains(err.Error(), fmt.Sprintf("code %#00x", encoding.CodeSASLAuth)):
t.Errorf("unexpected connection failure : %s", err)
}
}
@ -187,20 +188,20 @@ func TestConnSASLXOAUTH2AuthFailWithErrorResponse(t *testing.T) {
func TestConnSASLXOAUTH2AuthFailsAdditionalErrorResponse(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
},
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslChallenge{Challenge: []byte("fail1")},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLChallenge{Challenge: []byte("fail1")},
},
frame{
type_: frameTypeSASL,
channel: 0,
body: &saslChallenge{Challenge: []byte("fail2")},
frames.Frame{
Type: frameTypeSASL,
Channel: 0,
Body: &frames.SASLChallenge{Challenge: []byte("fail2")},
},
)
@ -227,7 +228,7 @@ func peerResponse(items ...interface{}) ([]byte, error) {
buf := make([]byte, 0)
for _, item := range items {
switch v := item.(type) {
case frame:
case frames.Frame:
b := &buffer.Buffer{}
e := writeFrame(b, v)
if e != nil {

Просмотреть файл

@ -9,6 +9,7 @@ import (
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// Sender sends messages on a single AMQP link.
@ -99,7 +100,7 @@ func (s *Sender) send(ctx context.Context, msg *Message) (chan encoding.Delivery
s.nextDeliveryTag++
}
fr := performTransfer{
fr := frames.PerformTransfer{
Handle: s.link.handle,
DeliveryID: &deliveryID,
DeliveryTag: deliveryTag,
@ -121,7 +122,7 @@ func (s *Sender) send(ctx context.Context, msg *Message) (chan encoding.Delivery
fr.Settled = senderSettled
// set done on last frame
fr.done = make(chan encoding.DeliveryState, 1)
fr.Done = make(chan encoding.DeliveryState, 1)
}
select {
@ -138,7 +139,7 @@ func (s *Sender) send(ctx context.Context, msg *Message) (chan encoding.Delivery
fr.MessageFormat = nil
}
return fr.done, nil
return fr.Done, nil
}
// Address returns the link's address.

Просмотреть файл

@ -7,6 +7,7 @@ import (
"sync"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// Session is an AMQP session.
@ -16,9 +17,9 @@ type Session struct {
channel uint16 // session's local channel
remoteChannel uint16 // session's remote channel, owned by conn.mux
conn *conn // underlying conn
rx chan frame // frames destined for this session are sent on this chan by conn.mux
tx chan frameBody // non-transfer frames to be sent; session must track disposition
txTransfer chan *performTransfer // transfer frames to be sent; session must track disposition
rx chan frames.Frame // frames destined for this session are sent on this chan by conn.mux
tx chan frames.FrameBody // non-transfer frames to be sent; session must track disposition
txTransfer chan *frames.PerformTransfer // transfer frames to be sent; session must track disposition
// flow control
incomingWindow uint32
@ -41,9 +42,9 @@ func newSession(c *conn, channel uint16) *Session {
return &Session{
conn: c,
channel: channel,
rx: make(chan frame),
tx: make(chan frameBody),
txTransfer: make(chan *performTransfer),
rx: make(chan frames.Frame),
tx: make(chan frames.FrameBody),
txTransfer: make(chan *frames.PerformTransfer),
incomingWindow: DefaultWindow,
outgoingWindow: DefaultWindow,
handleMax: DefaultMaxLinks - 1,
@ -73,12 +74,12 @@ func (s *Session) Close(ctx context.Context) error {
// txFrame sends a frame to the connWriter.
// it returns an error if the connection has been closed.
func (s *Session) txFrame(p frameBody, done chan encoding.DeliveryState) error {
return s.conn.wantWriteFrame(frame{
type_: frameTypeAMQP,
channel: s.channel,
body: p,
done: done,
func (s *Session) txFrame(p frames.FrameBody, done chan encoding.DeliveryState) error {
return s.conn.wantWriteFrame(frames.Frame{
Type: frameTypeAMQP,
Channel: s.channel,
Body: p,
Done: done,
})
}
@ -122,7 +123,7 @@ func (s *Session) NewSender(opts ...LinkOption) (*Sender, error) {
return &Sender{link: l}, nil
}
func (s *Session) mux(remoteBegin *performBegin) {
func (s *Session) mux(remoteBegin *frames.PerformBegin) {
defer func() {
// clean up session record in conn.mux()
select {
@ -173,14 +174,14 @@ func (s *Session) mux(remoteBegin *performBegin) {
// session is being closed by user
case <-s.close:
_ = s.txFrame(&performEnd{}, nil)
_ = s.txFrame(&frames.PerformEnd{}, nil)
// discard frames until End is received or conn closed
EndLoop:
for {
select {
case fr := <-s.rx:
_, ok := fr.body.(*performEnd)
_, ok := fr.Body.(*frames.PerformEnd)
if ok {
break EndLoop
}
@ -221,12 +222,12 @@ func (s *Session) mux(remoteBegin *performBegin) {
// incoming frame for link
case fr := <-s.rx:
debug(1, "RX(Session): %s", fr.body)
debug(1, "RX(Session): %s", fr.Body)
switch body := fr.body.(type) {
switch body := fr.Body.(type) {
// Disposition frames can reference transfers from more than one
// link. Send this frame to all of them.
case *performDisposition:
case *frames.PerformDisposition:
start := body.First
end := start
if body.Last != nil {
@ -234,7 +235,7 @@ func (s *Session) mux(remoteBegin *performBegin) {
}
for deliveryID := start; deliveryID <= end; deliveryID++ {
handles := handlesByDeliveryID
if body.Role == roleSender {
if body.Role == encoding.RoleSender {
handles = handlesByRemoteDeliveryID
}
@ -244,7 +245,7 @@ func (s *Session) mux(remoteBegin *performBegin) {
}
delete(handles, deliveryID)
if body.Settled && body.Role == roleReceiver {
if body.Settled && body.Role == encoding.RoleReceiver {
// check if settlement confirmation was requested, if so
// confirm by closing channel
if done, ok := settlementByDeliveryID[deliveryID]; ok {
@ -262,15 +263,15 @@ func (s *Session) mux(remoteBegin *performBegin) {
continue
}
s.muxFrameToLink(link, fr.body)
s.muxFrameToLink(link, fr.Body)
}
continue
case *performFlow:
case *frames.PerformFlow:
if body.NextIncomingID == nil {
// This is a protocol error:
// "[...] MUST be set if the peer has received
// the begin frame for the session"
_ = s.txFrame(&performEnd{
_ = s.txFrame(&frames.PerformEnd{
Error: &Error{
Condition: ErrorNotAllowed,
Description: "next-incoming-id not set after session established",
@ -306,13 +307,13 @@ func (s *Session) mux(remoteBegin *performBegin) {
continue
}
s.muxFrameToLink(link, fr.body)
s.muxFrameToLink(link, fr.Body)
continue
}
if body.Echo {
niID := nextIncomingID
resp := &performFlow{
resp := &frames.PerformFlow{
NextIncomingID: &niID,
IncomingWindow: s.incomingWindow,
NextOutgoingID: nextOutgoingID,
@ -322,7 +323,7 @@ func (s *Session) mux(remoteBegin *performBegin) {
_ = s.txFrame(resp, nil)
}
case *performAttach:
case *frames.PerformAttach:
// On Attach response link should be looked up by name, then added
// to the links map with the remote's handle contained in this
// attach frame.
@ -336,9 +337,9 @@ func (s *Session) mux(remoteBegin *performBegin) {
link.remoteHandle = body.Handle
links[link.remoteHandle] = link
s.muxFrameToLink(link, fr.body)
s.muxFrameToLink(link, fr.Body)
case *performTransfer:
case *frames.PerformTransfer:
// "Upon receiving a transfer, the receiving endpoint will
// increment the next-incoming-id to match the implicit
// transfer-id of the incoming transfer plus one, as well
@ -356,7 +357,7 @@ func (s *Session) mux(remoteBegin *performBegin) {
select {
case <-s.conn.done:
case link.rx <- fr.body:
case link.rx <- fr.Body:
}
// if this message is received unsettled and link rcv-settle-mode == second, add to handlesByRemoteDeliveryID
@ -369,7 +370,7 @@ func (s *Session) mux(remoteBegin *performBegin) {
// Update peer's outgoing window if half has been consumed.
if remoteOutgoingWindow < s.incomingWindow/2 {
nID := nextIncomingID
flow := &performFlow{
flow := &frames.PerformFlow{
NextIncomingID: &nID,
IncomingWindow: s.incomingWindow,
NextOutgoingID: nextOutgoingID,
@ -379,15 +380,15 @@ func (s *Session) mux(remoteBegin *performBegin) {
_ = s.txFrame(flow, nil)
}
case *performDetach:
case *frames.PerformDetach:
link, ok := links[body.Handle]
if !ok {
continue
}
s.muxFrameToLink(link, fr.body)
s.muxFrameToLink(link, fr.Body)
case *performEnd:
_ = s.txFrame(&performEnd{}, nil)
case *frames.PerformEnd:
_ = s.txFrame(&frames.PerformEnd{}, nil)
s.err = fmt.Errorf("session ended by server: %s", body.Error)
return
@ -420,13 +421,13 @@ func (s *Session) mux(remoteBegin *performBegin) {
// if not settled, add done chan to map
// and clear from frame so conn doesn't close it.
if !fr.Settled && fr.done != nil {
settlementByDeliveryID[deliveryID] = fr.done
fr.done = nil
if !fr.Settled && fr.Done != nil {
settlementByDeliveryID[deliveryID] = fr.Done
fr.Done = nil
}
debug(2, "TX(Session) - txtransfer: %s", fr)
_ = s.txFrame(fr, fr.done)
_ = s.txFrame(fr, fr.Done)
// "Upon sending a transfer, the sending endpoint will increment
// its next-outgoing-id, decrement its remote-incoming-window,
@ -439,7 +440,7 @@ func (s *Session) mux(remoteBegin *performBegin) {
case fr := <-s.tx:
switch fr := fr.(type) {
case *performFlow:
case *frames.PerformFlow:
niID := nextIncomingID
fr.NextIncomingID = &niID
fr.IncomingWindow = s.incomingWindow
@ -447,7 +448,7 @@ func (s *Session) mux(remoteBegin *performBegin) {
fr.OutgoingWindow = s.outgoingWindow
debug(1, "TX(Session) - tx: %s", fr)
_ = s.txFrame(fr, nil)
case *performTransfer:
case *frames.PerformTransfer:
panic("transfer frames must use txTransfer")
default:
debug(1, "TX(Session) - default: %s", fr)
@ -457,7 +458,7 @@ func (s *Session) mux(remoteBegin *performBegin) {
}
}
func (s *Session) muxFrameToLink(l *link, fr frameBody) {
func (s *Session) muxFrameToLink(l *link, fr frames.FrameBody) {
select {
case l.rx <- fr:
case <-l.detached: