Make rpc.go so it can use a single link and send/receive multiple messages (#52)

Service Bus and Event Hubs both allow you to send multiple messages on the management link. We weren't using this and it meant that we were creating a separate link _per_ request.

This PR changes that so we pass in a unique message ID for each request, which allows us to demux the requests in the client, thus only using a single link.

This is how the other track 2 SDKs currently handle it.
This commit is contained in:
Richard Park 2021-08-26 15:54:35 -07:00 коммит произвёл GitHub
Родитель 6b7304f631
Коммит bff5bef799
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 494 добавлений и 35 удалений

8
go.mod
Просмотреть файл

@ -3,11 +3,11 @@ module github.com/Azure/azure-amqp-common-go/v3
go 1.12
require (
github.com/Azure/go-amqp v0.13.0
github.com/Azure/go-autorest/autorest v0.11.3
github.com/Azure/go-autorest/autorest/adal v0.9.0
github.com/Azure/go-amqp v0.13.11
github.com/Azure/go-autorest/autorest v0.11.18
github.com/Azure/go-autorest/autorest/adal v0.9.13
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/devigned/tab v0.1.1
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
)

38
go.sum
Просмотреть файл

@ -1,17 +1,23 @@
github.com/Azure/go-amqp v0.13.0 h1:Iu9KCsgiWK7RNFlL0EkfvlulVX5tm8x9jSxnUUtPYEI=
github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/go-amqp v0.13.11 h1:E28zKoWuzO4+D80iUD88BUorI5PqvIZ/S/77md3hIvA=
github.com/Azure/go-amqp v0.13.11/go.mod h1:D5ZrjQqB1dyp1A+G73xeL/kNn7D5qHJIIsNNps7YNmk=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws=
github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
github.com/Azure/go-autorest/autorest/adal v0.9.0 h1:SigMbuFNuKgc1xcGhaeapbh+8fgsu+GxgDRFyg7f5lM=
github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
github.com/Azure/go-autorest/autorest/mocks v0.4.0 h1:z20OWOSG5aCye0HEkDp6TPmP17ZcfeMxPi6HnSALa8c=
github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE=
github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -19,8 +25,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
@ -30,15 +36,13 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

Просмотреть файл

@ -33,7 +33,7 @@ import (
"github.com/devigned/tab"
"github.com/Azure/azure-amqp-common-go/v3"
common "github.com/Azure/azure-amqp-common-go/v3"
"github.com/Azure/azure-amqp-common-go/v3/internal/tracing"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/go-amqp"
@ -48,14 +48,23 @@ const (
type (
// Link is the bidirectional communication structure used for CBS negotiation
Link struct {
session *amqp.Session
receiver *amqp.Receiver
sender *amqp.Sender
session *amqp.Session
receiver amqpReceiver // *amqp.Receiver
sender amqpSender // *amqp.Sender
clientAddress string
rpcMu sync.Mutex
sessionID *string
useSessionID bool
id string
responseMu sync.Mutex
startResponseRouterOnce *sync.Once
responseMap map[string]chan rpcResponse
// for unit tests
uuidNewV4 func() (uuid.UUID, error)
messageAccept func(message *amqp.Message, ctx context.Context) error
}
// Response is the simplified response structure from an RPC like call
@ -67,6 +76,22 @@ type (
// LinkOption provides a way to customize the construction of a Link
LinkOption func(link *Link) error
rpcResponse struct {
message *amqp.Message
err error
}
// Actually: *amqp.Receiver
amqpReceiver interface {
Receive(ctx context.Context) (*amqp.Message, error)
Close(ctx context.Context) error
}
amqpSender interface {
Send(ctx context.Context, msg *amqp.Message) error
Close(ctx context.Context) error
}
)
// LinkWithSessionFilter configures a Link to use a session filter
@ -100,6 +125,11 @@ func NewLinkWithSession(session *amqp.Session, address string, opts ...LinkOptio
session: session,
clientAddress: strings.Replace("$", "", address, -1) + replyPostfix + id,
id: id,
uuidNewV4: uuid.NewV4,
messageAccept: (*amqp.Message).Accept,
responseMap: map[string]chan rpcResponse{},
startResponseRouterOnce: &sync.Once{},
}
for _, opt := range opts {
@ -157,6 +187,7 @@ func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration,
defer span.End()
res, err := l.RPC(ctx, msg)
if err != nil {
tab.For(ctx).Error(fmt.Errorf("error in RPC via link %s: %v", l.id, err))
return nil, err
@ -183,19 +214,64 @@ func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration,
return res.(*Response), nil
}
// startResponseRouter is responsible for taking any messages received on the 'response'
// link and forwarding it to the proper channel. The channel is being select'd by the
// original `RPC` call.
func (l *Link) startResponseRouter() {
for {
res, err := l.receiver.Receive(context.Background())
// You'll see this when the link is shutting down (either
// service-initiated via 'detach' or a user-initiated shutdown)
if isClosedError(err) {
l.broadcastError(err)
break
}
// I don't believe this should happen. The JS version of this same code
// ignores errors as well since responses should always be correlated
// to actual send requests. So this is just here for completeness.
if res == nil {
continue
}
autogenMessageId, ok := res.Properties.CorrelationID.(string)
if !ok {
// TODO: it'd be good to track these in some way. We don't have a good way to
// forward this on at this point.
continue
}
ch := l.deleteFromMap(autogenMessageId)
// there's no legitimate case where this should be nil - purely defensive.
if ch != nil {
ch <- rpcResponse{message: res, err: err}
}
}
}
// RPC sends a request and waits on a response for that request
func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
const altStatusCodeKey, altDescriptionKey = "statusCode", "statusDescription"
l.startResponseRouterOnce.Do(func() {
go l.startResponseRouter()
})
l.rpcMu.Lock()
defer l.rpcMu.Unlock()
copiedMessage, messageID, err := addMessageID(msg, l.uuidNewV4)
if err != nil {
return nil, err
}
// use the copiedMessage from this point
msg = copiedMessage
const altStatusCodeKey, altDescriptionKey = "statusCode", "statusDescription"
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RPC")
defer span.End()
if msg.Properties == nil {
msg.Properties = &amqp.MessageProperties{}
}
msg.Properties.ReplyTo = l.clientAddress
if msg.ApplicationProperties == nil {
@ -208,13 +284,28 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
}
}
err := l.sender.Send(ctx, msg)
responseCh := l.addChannelToMap(messageID)
err = l.sender.Send(ctx, msg)
if err != nil {
l.deleteFromMap(messageID)
tab.For(ctx).Error(err)
return nil, err
}
res, err := l.receiver.Receive(ctx)
var res *amqp.Message
select {
case <-ctx.Done():
l.deleteFromMap(messageID)
res, err = nil, ctx.Err()
case resp := <-responseCh:
// this will get triggered by the loop in 'startReceiverRouter' when it receives
// a message with our autoGenMessageID set in the correlation_id property.
res, err = resp.message, resp.err
}
if err != nil {
tab.For(ctx).Error(err)
return nil, err
@ -260,7 +351,7 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
Message: res,
}
if err := res.Accept(ctx); err != nil {
if err := l.messageAccept(res, ctx); err != nil {
tab.For(ctx).Error(err)
return response, err
}
@ -316,3 +407,77 @@ func (l *Link) closeSession(ctx context.Context) error {
}
return nil
}
func (l *Link) addChannelToMap(messageID string) chan rpcResponse {
l.responseMu.Lock()
defer l.responseMu.Unlock()
responseCh := make(chan rpcResponse, 1)
l.responseMap[messageID] = responseCh
return responseCh
}
func (l *Link) deleteFromMap(messageID string) chan rpcResponse {
l.responseMu.Lock()
defer l.responseMu.Unlock()
ch := l.responseMap[messageID]
delete(l.responseMap, messageID)
return ch
}
// broadcastError notifies the anyone waiting for a response that the link/session/connection
// has closed.
func (l *Link) broadcastError(err error) {
l.responseMu.Lock()
defer l.responseMu.Unlock()
for _, ch := range l.responseMap {
ch <- rpcResponse{err: err}
}
l.responseMap = nil
}
// addMessageID generates a unique UUID for the message. When the service
// responds it will fill out the correlation ID property of the response
// with this ID, allowing us to link the request and response together.
//
// NOTE: this function copies 'message', adding in a 'Properties' object
// if it does not already exist.
func addMessageID(message *amqp.Message, uuidNewV4 func() (uuid.UUID, error)) (*amqp.Message, string, error) {
uuid, err := uuidNewV4()
if err != nil {
return nil, "", err
}
autoGenMessageID := uuid.String()
// we need to modify the message so we'll make a copy
copiedMessage := *message
if message.Properties == nil {
copiedMessage.Properties = &amqp.MessageProperties{
MessageID: autoGenMessageID,
}
} else {
// properties already exist, make a copy and then update
// the message ID
copiedProperties := *message.Properties
copiedProperties.MessageID = autoGenMessageID
copiedMessage.Properties = &copiedProperties
}
return &copiedMessage, autoGenMessageID, nil
}
func isClosedError(err error) bool {
return errors.Is(err, amqp.ErrLinkClosed) ||
errors.Is(err, amqp.ErrLinkDetached) ||
errors.Is(err, amqp.ErrConnClosed) ||
errors.Is(err, amqp.ErrSessionClosed)
}

290
rpc/rpc_test.go Normal file
Просмотреть файл

@ -0,0 +1,290 @@
package rpc
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
)
func TestResponseRouterBasic(t *testing.T) {
receiver := &fakeReceiver{
Responses: []rpcResponse{
{amqpMessageWithCorrelationId("my message id"), nil},
{nil, amqp.ErrLinkClosed},
},
}
link := &Link{
responseMap: map[string]chan rpcResponse{
"my message id": make(chan rpcResponse, 1),
},
receiver: receiver,
}
ch := link.responseMap["my message id"]
link.startResponseRouter()
result := <-ch
require.EqualValues(t, result.message.Data[0], []byte("ID was my message id"))
require.Empty(t, receiver.Responses)
require.Nil(t, link.responseMap, "Response map is nil'd out after we get a closed error")
}
func TestResponseRouterMissingMessageID(t *testing.T) {
receiver := &fakeReceiver{
Responses: []rpcResponse{
{amqpMessageWithCorrelationId("my message id"), nil},
{nil, amqp.ErrLinkClosed},
},
}
link := &Link{
responseMap: map[string]chan rpcResponse{},
receiver: receiver,
}
link.startResponseRouter()
require.Empty(t, receiver.Responses)
}
func TestResponseRouterBadCorrelationID(t *testing.T) {
messageWithBadCorrelationID := &amqp.Message{
Properties: &amqp.MessageProperties{
CorrelationID: uint64(1),
},
}
receiver := &fakeReceiver{
Responses: []rpcResponse{
{messageWithBadCorrelationID, nil},
{nil, amqp.ErrLinkClosed},
},
}
link := &Link{
responseMap: map[string]chan rpcResponse{},
receiver: receiver,
}
link.startResponseRouter()
require.Empty(t, receiver.Responses)
}
func TestResponseRouterFatalErrors(t *testing.T) {
fatalErrors := []error{
amqp.ErrLinkClosed,
amqp.ErrLinkDetached,
amqp.ErrConnClosed,
amqp.ErrSessionClosed,
}
for _, fatalError := range fatalErrors {
t.Run(fatalError.Error(), func(t *testing.T) {
receiver := &fakeReceiver{
Responses: []rpcResponse{
{nil, fatalError},
},
}
sentinelCh := make(chan rpcResponse, 1)
link := &Link{
responseMap: map[string]chan rpcResponse{
"sentinel": sentinelCh,
},
receiver: receiver,
}
link.startResponseRouter()
require.Empty(t, receiver.Responses)
// also, we should have broadcasted that the link is closed to anyone else
// that had not yet received a response but was still waiting.
select {
case rpcResponse := <-sentinelCh:
require.Error(t, rpcResponse.err, fatalError.Error())
require.Nil(t, rpcResponse.message)
case <-time.After(time.Second * 5):
require.Fail(t, "sentinel channel should have received a message")
}
})
}
}
func TestResponseRouterNoResponse(t *testing.T) {
receiver := &fakeReceiver{
Responses: []rpcResponse{
{nil, errors.New("Some other error that will get ignored since we can't route it to anyone (ie: no message ID)")},
{nil, amqp.ErrConnClosed},
},
}
link := &Link{
responseMap: map[string]chan rpcResponse{},
receiver: receiver,
}
link.startResponseRouter()
require.Empty(t, receiver.Responses)
}
func TestAddMessageID(t *testing.T) {
message, id, err := addMessageID(&amqp.Message{}, uuid.NewV4)
require.NoError(t, err)
require.NotEmpty(t, id)
require.EqualValues(t, message.Properties.MessageID, id)
m := &amqp.Message{
Data: [][]byte{[]byte("hello world")},
Properties: &amqp.MessageProperties{
UserID: []byte("my user ID"),
MessageID: "is that will not be copied"},
}
message, id, err = addMessageID(m, uuid.NewV4)
require.NoError(t, err)
require.NotEmpty(t, id)
require.EqualValues(t, message.Properties.MessageID, id)
require.EqualValues(t, message.Properties.UserID, []byte("my user ID"))
require.EqualValues(t, message.Data[0], []byte("hello world"))
}
func TestRPCBasic(t *testing.T) {
fakeUUID := uuid.UUID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
replyMessage := amqpMessageWithCorrelationId(fakeUUID.String())
replyMessage.ApplicationProperties = map[string]interface{}{
"status-code": int32(200),
}
ch := make(chan struct{})
sender := &fakeSender{ch: ch}
receiver := &fakeReceiver{
Responses: []rpcResponse{
{replyMessage, nil},
{nil, amqp.ErrConnClosed},
},
ch: ch,
}
l := &Link{
receiver: receiver,
sender: sender,
startResponseRouterOnce: &sync.Once{},
responseMap: map[string]chan rpcResponse{},
uuidNewV4: func() (uuid.UUID, error) {
return fakeUUID, nil
},
messageAccept: func(message *amqp.Message, ctx context.Context) error {
return nil
},
}
messageToSend := &amqp.Message{}
resp, err := l.RPC(context.Background(), messageToSend)
require.NoError(t, err)
require.EqualValues(t, fakeUUID.String(), sender.Sent[0].Properties.MessageID, "Sent message contains a uniquely generated ID")
require.EqualValues(t, fakeUUID.String(), resp.Message.Properties.CorrelationID, "Correlation ID matches our originally sent message")
require.Nil(t, replyMessage.Properties.MessageID, "Original message not modified")
}
func TestRPCFailedSend(t *testing.T) {
// important bit is that we clean up the channel we stored in the map
// since we're no longer waiting for the response.
fakeUUID := uuid.UUID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
replyMessage := amqpMessageWithCorrelationId(fakeUUID.String())
replyMessage.ApplicationProperties = map[string]interface{}{
"status-code": int32(200),
}
ch := make(chan struct{})
sender := &fakeSender{
ch: ch,
}
receiver := &fakeReceiver{
ch: ch,
Responses: []rpcResponse{
{nil, amqp.ErrConnClosed},
},
}
l := &Link{
receiver: receiver,
sender: sender,
startResponseRouterOnce: &sync.Once{},
responseMap: map[string]chan rpcResponse{},
uuidNewV4: func() (uuid.UUID, error) {
return fakeUUID, nil
},
messageAccept: func(message *amqp.Message, ctx context.Context) error {
panic("Should not be called")
},
}
messageToSend := &amqp.Message{}
cancelledContext, cancel := context.WithCancel(context.Background())
cancel()
resp, err := l.RPC(cancelledContext, messageToSend)
require.Nil(t, resp)
require.EqualError(t, err, context.Canceled.Error())
require.EqualValues(t, fakeUUID.String(), sender.Sent[0].Properties.MessageID, "Sent message contains a uniquely generated ID")
}
func amqpMessageWithCorrelationId(id string) *amqp.Message {
return &amqp.Message{
Data: [][]byte{[]byte(fmt.Sprintf("ID was %s", id))},
Properties: &amqp.MessageProperties{
CorrelationID: id,
},
}
}
type fakeReceiver struct {
Responses []rpcResponse
ch <-chan struct{}
}
func (fr *fakeReceiver) Receive(ctx context.Context) (*amqp.Message, error) {
// wait until the actual send if we're simulating request/response
if fr.ch != nil {
<-fr.ch
}
resp := fr.Responses[0]
fr.Responses = fr.Responses[1:]
return resp.message, resp.err
}
func (fr *fakeReceiver) Close(ctx context.Context) error {
panic("Not used for this test")
}
type fakeSender struct {
Sent []*amqp.Message
ch chan<- struct{}
}
func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error {
s.Sent = append(s.Sent, msg)
if s.ch != nil {
close(s.ch)
}
return nil
}
func (fs *fakeSender) Close(ctx context.Context) error {
panic("Not used for this test")
}