Moved some internal content to the internal directory (#189)

In preparation for some upcoming refactoring.
No functional/test changes.
This commit is contained in:
Joel Hendrix 2022-11-07 10:54:43 -08:00 коммит произвёл GitHub
Родитель 7adc55aa4e
Коммит fffbc152a9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 145 добавлений и 130 удалений

Просмотреть файл

@ -2,11 +2,8 @@ package amqp
import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/Azure/go-amqp/internal/debug"
@ -152,32 +149,6 @@ type SessionOptions struct {
MaxLinks uint32
}
// lockedRand provides a rand source that is safe for concurrent use.
type lockedRand struct {
mu sync.Mutex
src *rand.Rand
}
func (r *lockedRand) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.src.Read(p)
}
// package scoped rand source to avoid any issues with seeding
// of the global source.
var pkgRand = &lockedRand{
src: rand.New(rand.NewSource(time.Now().UnixNano())),
}
// randBytes returns a base64 encoded string of n bytes.
func randString(n int) string {
b := make([]byte, n)
// from math/rand, cannot fail
_, _ = pkgRand.Read(b)
return base64.RawURLEncoding.EncodeToString(b)
}
// linkKey uniquely identifies a link on a connection by name and direction.
//
// A link can be identified uniquely by the ordered tuple

Просмотреть файл

@ -17,6 +17,7 @@ import (
"github.com/Azure/go-amqp/internal/debug"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/shared"
)
// Default connection options
@ -220,7 +221,7 @@ func newConn(netConn net.Conn, opts *ConnOptions) (*conn, error) {
PeerMaxFrameSize: defaultMaxFrameSize,
channelMax: defaultMaxSessions - 1, // -1 because channel-max starts at zero
idleTimeout: defaultIdleTimeout,
containerID: randString(40),
containerID: shared.RandString(40),
Done: make(chan struct{}),
connErr: make(chan error, 2), // buffered to ensure connReader/Writer won't leak
closeMux: make(chan struct{}),
@ -684,7 +685,7 @@ func (c *conn) connWriter() {
cls := &frames.PerformClose{}
debug.Log(1, "TX (connWriter): %s", cls)
_ = c.writeFrame(frames.Frame{
Type: frameTypeAMQP,
Type: frames.TypeAMQP,
Body: cls,
})
return
@ -868,7 +869,7 @@ func (c *conn) openAMQP() (stateFunc, error) {
}
debug.Log(1, "TX (openAMQP): %s", open)
err := c.writeFrame(frames.Frame{
Type: frameTypeAMQP,
Type: frames.TypeAMQP,
Body: open,
Channel: 0,
})

Просмотреть файл

@ -11,6 +11,7 @@ import (
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/mocks"
"github.com/Azure/go-amqp/internal/test"
"github.com/stretchr/testify/require"
)
@ -38,7 +39,7 @@ func TestConnOptions(t *testing.T) {
"x-opt-test1": "test3",
"x-opt-test2": "test2",
}
if !testEqual(c.properties, wantProperties) {
if !test.Equal(c.properties, wantProperties) {
require.Equal(t, wantProperties, c.properties)
}
},
@ -401,7 +402,7 @@ func TestConnWriterError(t *testing.T) {
require.NoError(t, conn.Start())
// send a frame that our responder doesn't handle to simulate a conn.connWriter error
require.NoError(t, conn.SendFrame(frames.Frame{
Type: frameTypeAMQP,
Type: frames.TypeAMQP,
Body: &frames.PerformFlow{},
}))
// wait a bit for connReader to read from the mock

Просмотреть файл

@ -3,7 +3,6 @@ package amqp
import (
"context"
"fmt"
"reflect"
"sync/atomic"
"testing"
"time"
@ -11,68 +10,9 @@ import (
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/mocks"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
)
func testEqual(x, y interface{}) bool {
return cmp.Equal(x, y, compareOpts(x, y)...)
}
func testDiff(x, y interface{}) string {
return cmp.Diff(x, y, compareOpts(x, y)...)
}
func compareOpts(x, y interface{}) []cmp.Option {
return cmp.Options{
deepAllowUnexported(x, y),
cmpopts.EquateNaNs(),
}
}
// from https://github.com/google/go-cmp/issues/40
func deepAllowUnexported(vs ...interface{}) cmp.Option {
m := make(map[reflect.Type]struct{})
for _, v := range vs {
structTypes(reflect.ValueOf(v), m)
}
var types []interface{}
for t := range m {
types = append(types, reflect.New(t).Elem().Interface())
}
return cmp.AllowUnexported(types...)
}
func structTypes(v reflect.Value, m map[reflect.Type]struct{}) {
if !v.IsValid() {
return
}
switch v.Kind() {
case reflect.Ptr:
if !v.IsNil() {
structTypes(v.Elem(), m)
}
case reflect.Interface:
if !v.IsNil() {
structTypes(v.Elem(), m)
}
case reflect.Slice, reflect.Array:
for i := 0; i < v.Len(); i++ {
structTypes(v.Index(i), m)
}
case reflect.Map:
for _, k := range v.MapKeys() {
structTypes(v.MapIndex(k), m)
}
case reflect.Struct:
m[v.Type()] = struct{}{}
for i := 0; i < v.NumField(); i++ {
structTypes(v.Field(i), m)
}
}
}
func sendInitialFlowFrame(t *testing.T, netConn *mocks.NetConn, handle uint32, credit uint32) {
nextIncoming := uint32(0)
count := uint32(0)

Просмотреть файл

@ -10,6 +10,11 @@ import (
"github.com/Azure/go-amqp/internal/encoding"
)
const (
TypeAMQP = 0x0
TypeSASL = 0x1
)
/*
<type name="source" class="composite" source="list" provides="source">

34
internal/shared/shared.go Normal file
Просмотреть файл

@ -0,0 +1,34 @@
package shared
import (
"encoding/base64"
"math/rand"
"sync"
"time"
)
// lockedRand provides a rand source that is safe for concurrent use.
type lockedRand struct {
mu sync.Mutex
src *rand.Rand
}
func (r *lockedRand) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.src.Read(p)
}
// package scoped rand source to avoid any issues with seeding
// of the global source.
var pkgRand = &lockedRand{
src: rand.New(rand.NewSource(time.Now().UnixNano())),
}
// RandString returns a base64 encoded string of n bytes.
func RandString(n int) string {
b := make([]byte, n)
// from math/rand, cannot fail
_, _ = pkgRand.Read(b)
return base64.RawURLEncoding.EncodeToString(b)
}

65
internal/test/test.go Normal file
Просмотреть файл

@ -0,0 +1,65 @@
package test
import (
"reflect"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
func Equal(x, y interface{}) bool {
return cmp.Equal(x, y, compareOpts(x, y)...)
}
func Diff(x, y interface{}) string {
return cmp.Diff(x, y, compareOpts(x, y)...)
}
func compareOpts(x, y interface{}) []cmp.Option {
return cmp.Options{
deepAllowUnexported(x, y),
cmpopts.EquateNaNs(),
}
}
// from https://github.com/google/go-cmp/issues/40
func deepAllowUnexported(vs ...interface{}) cmp.Option {
m := make(map[reflect.Type]struct{})
for _, v := range vs {
structTypes(reflect.ValueOf(v), m)
}
var types []interface{}
for t := range m {
types = append(types, reflect.New(t).Elem().Interface())
}
return cmp.AllowUnexported(types...)
}
func structTypes(v reflect.Value, m map[reflect.Type]struct{}) {
if !v.IsValid() {
return
}
switch v.Kind() {
case reflect.Ptr:
if !v.IsNil() {
structTypes(v.Elem(), m)
}
case reflect.Interface:
if !v.IsNil() {
structTypes(v.Elem(), m)
}
case reflect.Slice, reflect.Array:
for i := 0; i < v.Len(); i++ {
structTypes(v.Index(i), m)
}
case reflect.Map:
for _, k := range v.MapKeys() {
structTypes(v.MapIndex(k), m)
}
case reflect.Struct:
m[v.Type()] = struct{}{}
for i := 0; i < v.NumField(); i++ {
structTypes(v.Field(i), m)
}
}
}

Просмотреть файл

@ -12,6 +12,7 @@ import (
"github.com/Azure/go-amqp/internal/debug"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/shared"
)
// link is a unidirectional route.
@ -75,7 +76,7 @@ type link struct {
// newSendingLink creates a new sending link and attaches it to the session
func newSendingLink(target string, s *Session, opts *SenderOptions) (*link, error) {
l := &link{
Key: linkKey{randString(40), encoding.RoleSender},
Key: linkKey{shared.RandString(40), encoding.RoleSender},
Session: s,
close: make(chan struct{}),
Detached: make(chan struct{}),
@ -138,7 +139,7 @@ func newSendingLink(target string, s *Session, opts *SenderOptions) (*link, erro
func newReceivingLink(source string, s *Session, r *Receiver, opts *ReceiverOptions) (*link, error) {
l := &link{
Key: linkKey{randString(40), encoding.RoleReceiver},
Key: linkKey{shared.RandString(40), encoding.RoleReceiver},
Session: s,
receiver: r,
close: make(chan struct{}),

Просмотреть файл

@ -15,6 +15,7 @@ import (
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/test"
)
var exampleFrames = []struct {
@ -24,7 +25,7 @@ var exampleFrames = []struct {
{
label: "transfer",
frame: frames.Frame{
Type: frameTypeAMQP,
Type: frames.TypeAMQP,
Channel: 10,
Body: &frames.PerformTransfer{
Handle: 34983,
@ -71,8 +72,8 @@ func TestFrameMarshalUnmarshal(t *testing.T) {
if err != nil {
t.Fatalf("%+v", err)
}
if !testEqual(want.Body, payload) {
t.Errorf("Roundtrip produced different results:\n %s", testDiff(want.Body, payload))
if !test.Equal(want.Body, payload) {
t.Errorf("Roundtrip produced different results:\n %s", test.Diff(want.Body, payload))
}
})
}
@ -209,8 +210,8 @@ func TestMarshalUnmarshal(t *testing.T) {
return
}
cmpType := reflect.Indirect(newType).Interface()
if !testEqual(type_, cmpType) {
t.Errorf("Roundtrip produced different results:\n %s", testDiff(type_, cmpType))
if !test.Equal(type_, cmpType) {
t.Errorf("Roundtrip produced different results:\n %s", test.Diff(type_, cmpType))
}
})
}
@ -232,7 +233,7 @@ func TestIssue173(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if d := testDiff(want, got); d != "" {
if d := test.Diff(want, got); d != "" {
t.Fatal(d)
}
}
@ -251,8 +252,8 @@ func TestReadAny(t *testing.T) {
t.Fatalf("%+v", err)
}
if !testEqual(type_, got) {
t.Errorf("Roundtrip produced different results:\n %s", testDiff(type_, got))
if !test.Equal(type_, got) {
t.Errorf("Roundtrip produced different results:\n %s", test.Diff(type_, got))
}
})
}

15
sasl.go
Просмотреть файл

@ -16,11 +16,6 @@ const (
saslMechanismXOAUTH2 encoding.Symbol = "XOAUTH2"
)
const (
frameTypeAMQP = 0x0
frameTypeSASL = 0x1
)
// SASLType represents a SASL configuration to use during authentication.
type SASLType func(c *conn) error
@ -46,7 +41,7 @@ func SASLTypePlain(username, password string) SASLType {
}
debug.Log(1, "TX (ConnSASLPlain): %s", init)
err := c.writeFrame(frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Body: init,
})
if err != nil {
@ -76,7 +71,7 @@ func SASLTypeAnonymous() SASLType {
}
debug.Log(1, "TX (ConnSASLAnonymous): %s", init)
err := c.writeFrame(frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Body: init,
})
if err != nil {
@ -108,7 +103,7 @@ func SASLTypeExternal(resp string) SASLType {
}
debug.Log(1, "TX (ConnSASLExternal): %s", init)
err := c.writeFrame(frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Body: init,
})
if err != nil {
@ -168,7 +163,7 @@ func (s saslXOAUTH2Handler) init() (stateFunc, error) {
s.conn.PeerMaxFrameSize = s.maxFrameSizeOverride
}
err := s.conn.writeFrame(frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Body: &frames.SASLInit{
Mechanism: saslMechanismXOAUTH2,
InitialResponse: s.response,
@ -206,7 +201,7 @@ func (s saslXOAUTH2Handler) step() (stateFunc, error) {
// The SASL protocol requires clients to send an empty response to this challenge.
err := s.conn.writeFrame(frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Body: &frames.SASLResponse{
Response: []byte{},
},

Просмотреть файл

@ -11,6 +11,7 @@ import (
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/test"
"github.com/Azure/go-amqp/internal/testconn"
)
@ -30,7 +31,7 @@ func TestSaslXOAUTH2InitialResponse(t *testing.T) {
}
if !bytes.Equal(wantedResp, gotResp) {
t.Errorf("Initial response does not match expected:\n %s", testDiff(gotResp, wantedResp))
t.Errorf("Initial response does not match expected:\n %s", test.Diff(gotResp, wantedResp))
}
}
@ -81,18 +82,18 @@ func TestConnSASLXOAUTH2AuthSuccess(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
},
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLOutcome{Code: encoding.CodeSASLOK},
},
[]byte("AMQP\x00\x01\x00\x00"),
frames.Frame{
Type: frameTypeAMQP,
Type: frames.TypeAMQP,
Channel: 0,
Body: &frames.PerformOpen{},
},
@ -117,12 +118,12 @@ func TestConnSASLXOAUTH2AuthFail(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
},
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLOutcome{Code: encoding.CodeSASLAuth},
},
@ -152,17 +153,17 @@ func TestConnSASLXOAUTH2AuthFailWithErrorResponse(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
},
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLChallenge{Challenge: []byte("{ \"status\":\"401\", \"schemes\":\"bearer\", \"scope\":\"https://mail.google.com/\" }")},
},
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLOutcome{Code: encoding.CodeSASLAuth},
},
@ -192,17 +193,17 @@ func TestConnSASLXOAUTH2AuthFailsAdditionalErrorResponse(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismXOAUTH2}},
},
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLChallenge{Challenge: []byte("fail1")},
},
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLChallenge{Challenge: []byte("fail2")},
},
@ -232,18 +233,18 @@ func TestConnSASLExternal(t *testing.T) {
buf, err := peerResponse(
[]byte("AMQP\x03\x01\x00\x00"),
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLMechanisms{Mechanisms: []encoding.Symbol{saslMechanismEXTERNAL}},
},
frames.Frame{
Type: frameTypeSASL,
Type: frames.TypeSASL,
Channel: 0,
Body: &frames.SASLOutcome{Code: encoding.CodeSASLOK},
},
[]byte("AMQP\x00\x01\x00\x00"),
frames.Frame{
Type: frameTypeAMQP,
Type: frames.TypeAMQP,
Channel: 0,
Body: &frames.PerformOpen{},
},

Просмотреть файл

@ -115,7 +115,7 @@ func (s *Session) Close(ctx context.Context) error {
// it returns an error if the connection has been closed.
func (s *Session) txFrame(p frames.FrameBody, done chan encoding.DeliveryState) error {
return s.conn.SendFrame(frames.Frame{
Type: frameTypeAMQP,
Type: frames.TypeAMQP,
Channel: s.channel,
Body: p,
Done: done,