[azservicebus] Updating go-amqp, fixing several bugs (#16530)

Fixes two issues:
- go-amqp v0.17.0 update handles cancelling drain when the link detaches
- and now we can also handle empty session IDs! Test added and reflected in the public API contract.

Breaking changes:

- `Message` and `ReceivedMessage` API surface has changed to move more properties to be 'pointer to prop' rather than just the 'prop' so they can be properly optional. (this also fixed the 'empty session ID' issue)

Fixes #16340
Fixes #15163
This commit is contained in:
Richard Park 2021-12-08 14:56:49 -08:00 коммит произвёл GitHub
Родитель f1d504555a
Коммит 69171905c8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 145 добавлений и 74 удалений

Просмотреть файл

@ -1,15 +1,23 @@
# Release History
## 0.3.2 (Unreleased)
## 0.3.2 (2021-12-08)
### Features Added
- Enabling websocket support via `ClientOptions.NewWebSocketConn`. For an example, see the `ExampleNewClient_usingWebsockets` function in `example_client_test.go`.
- Enabling websocket support via `ClientOptions.NewWebSocketConn`. For an example, see the `ExampleNewClient_usingWebsockets`
function in `example_client_test.go`.
### Breaking Changes
- Message properties that come from the standard AMQP message have been made into pointers, to allow them to be
properly omitted (or indicate that they've been omitted) when sending and receiving.
### Bugs Fixed
- Session IDs can now be blank - prior to this release it would cause an error. PR#16530
- Drain will no longer hang if there is a link failure. Thanks to @flexarts for reporting this issue: PR#16530
- Attempting to settle messages received in ReceiveAndDelete mode would cause a panic. PR#16255
### Other Changes
- Removed legacy dependencies, resulting in a much smaller package.

Просмотреть файл

@ -6,8 +6,8 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.2
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211119222651-4d034a7609a7
github.com/Azure/go-amqp v0.16.4
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211208010914-2b10e91d237e
github.com/Azure/go-amqp v0.17.0
github.com/devigned/tab v0.1.1
github.com/joho/godotenv v1.3.0
github.com/jpillora/backoff v1.0.0

Просмотреть файл

@ -7,16 +7,17 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0/go.mod h1:GJzjM4SR9T0Ky
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.2 h1:rImM7Yjz9yUgpdxp3A4cZLm1JZuo4XbtIp2LrUZnwYw=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.2/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211119220622-083ee5254126 h1:vpj/kWJTCa5o+nLRWRktN+H8nsaxbzedyzxGpCnrjUE=
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211119220622-083ee5254126/go.mod h1:ZqzqhjFc5XwsoZkvHxXbh+fqUNr62DTGyMlKsLMZum8=
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211119222651-4d034a7609a7 h1:XlmCO1oiBqFpwM0nH40WN+l6NcC1m8OktaCLOfRzIU8=
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211119222651-4d034a7609a7/go.mod h1:ZqzqhjFc5XwsoZkvHxXbh+fqUNr62DTGyMlKsLMZum8=
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211208010914-2b10e91d237e h1:9n9b/dngBY5hfevx1jmEMbGvZGCcx1zAUaeYF8dk9Co=
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211208010914-2b10e91d237e/go.mod h1:7hMUlcqiMXDUJtU1EWQlhhkC4BfIr6pEsiyuRYq4xLQ=
github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.16.4 h1:/1oIXrq5zwXLHaoYDliJyiFjJSpJZMWGgtMX9e0/Z30=
github.com/Azure/go-amqp v0.16.4/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.16.5-0.20211207190606-01b4c6402182 h1:E1rCzk/cEGhcedyB0l3ImnNNRCztZq51ataskdhCno8=
github.com/Azure/go-amqp v0.16.5-0.20211207190606-01b4c6402182/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest v0.11.22/go.mod h1:BAWYUWGPEtKPzjVkp0Q6an0MJcJDsoh5Z1BFAEFs4Xs=
github.com/Azure/go-autorest/autorest/adal v0.9.14/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/adal v0.9.17/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ=
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
@ -50,6 +51,7 @@ github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84=
github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
@ -105,11 +107,13 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 h1:Z04ewVs7JhXaYkmDhBERPi41gnltfQpMWDnTnQbaCqk=
golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@ -121,6 +125,7 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211001092434-39dca1131b70 h1:pGleJoyD1yA5HfvuaksHxD0404gsEkNDerKsQ0N0y1s=
golang.org/x/sys v0.0.0-20211001092434-39dca1131b70/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

Просмотреть файл

@ -143,7 +143,7 @@ func TestAMQPLinksRecovery(t *testing.T) {
createLinkCalled = 0
// let's do just a link level one
require.NoError(t, links.RecoverIfNeeded(ctx, links.revision+1, amqp.ErrLinkDetached), amqp.ErrLinkDetached.Error())
require.NoError(t, links.RecoverIfNeeded(ctx, links.revision+1, &amqp.DetachError{}), &amqp.DetachError{})
require.EqualValues(t, 0, ns.recovered)
require.EqualValues(t, 1, sender.Closed)
require.EqualValues(t, 1, createLinkCalled)
@ -161,7 +161,7 @@ func TestAMQPLinksRecovery(t *testing.T) {
createLinkCalled = 0
// cancellation overrides any other logic.
require.Error(t, links.RecoverIfNeeded(ctx, links.revision+1, amqp.ErrLinkDetached), amqp.ErrLinkDetached.Error())
require.Error(t, links.RecoverIfNeeded(ctx, links.revision+1, &amqp.DetachError{}), &amqp.DetachError{})
require.EqualValues(t, 0, ns.recovered)
require.EqualValues(t, 0, sender.Closed)
require.EqualValues(t, 0, createLinkCalled)

Просмотреть файл

@ -185,7 +185,9 @@ func shouldRecreateLink(err error) bool {
return false
}
return errors.Is(err, amqp.ErrLinkDetached) ||
var detachError *amqp.DetachError
return errors.As(err, &detachError) ||
// TODO: proper error types needs to happen
strings.Contains(err.Error(), "detach frame link detached")
}

Просмотреть файл

@ -125,7 +125,7 @@ func Test_isRetryableAMQPError(t *testing.T) {
func Test_shouldRecreateLink(t *testing.T) {
require.False(t, shouldRecreateLink(nil))
require.True(t, shouldRecreateLink(amqp.ErrLinkDetached))
require.True(t, shouldRecreateLink(&amqp.DetachError{}))
// going to treat these as "connection troubles" and throw them into the
// connection recovery scenario instead.

Просмотреть файл

@ -649,9 +649,7 @@ func (mc *mgmtClient) ScheduleMessages(ctx context.Context, enqueueTime time.Tim
"message": encoded,
}
// TODO: I believe empty string should be allowed here. There isn't a way for the
// user to opt out of session related information.
if messages[i].Properties.GroupID != "" {
if messages[i].Properties.GroupID != nil {
individualMessage["session-id"] = messages[i].Properties.GroupID
}

Просмотреть файл

@ -28,6 +28,8 @@ func CreateTempQueue(remainingArgs []string) int {
return 1
}
_ = shared.LoadEnvironment()
_, adminClient, err := clientCreator()
if err != nil {

Просмотреть файл

@ -10,7 +10,6 @@ import (
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
)
@ -19,13 +18,13 @@ import (
type ReceivedMessage struct {
MessageID string
ContentType string
CorrelationID string
ContentType *string
CorrelationID *string
SessionID *string
Subject string
ReplyTo string
ReplyToSessionID string
To string
Subject *string
ReplyTo *string
ReplyToSessionID *string
To *string
TimeToLive *time.Duration
@ -69,19 +68,19 @@ func (rm *ReceivedMessage) Body() ([]byte, error) {
return rm.rawAMQPMessage.Data[0], nil
}
// Message is a SendableMessage which can be sent using a Client.NewSender().
// Message is a message with a body and commonly used properties.
type Message struct {
MessageID string
MessageID *string
ContentType string
CorrelationID string
ContentType *string
CorrelationID *string
// Body corresponds to the first []byte array in the Data section of an AMQP message.
Body []byte
SessionID *string
Subject string
ReplyTo string
ReplyToSessionID string
To string
Subject *string
ReplyTo *string
ReplyToSessionID *string
To *string
TimeToLive *time.Duration
PartitionKey *string
@ -117,16 +116,10 @@ func (m *Message) toAMQPMessage() *amqp.Message {
amqpMsg.Header.TTL = *m.TimeToLive
}
// TODO: I don't think this should be strictly required. Need to
// look into why it won't send properly without one.
var messageID = m.MessageID
var messageID interface{}
if messageID == "" {
uuid, err := uuid.New()
if err == nil {
messageID = uuid.String()
}
if m.MessageID != nil {
messageID = *m.MessageID
}
amqpMsg.Properties = &amqp.MessageProperties{
@ -134,14 +127,17 @@ func (m *Message) toAMQPMessage() *amqp.Message {
}
if m.SessionID != nil {
amqpMsg.Properties.GroupID = *m.SessionID
amqpMsg.Properties.GroupID = m.SessionID
}
// if m.GroupSequence != nil {
// amqpMsg.Properties.GroupSequence = *m.GroupSequence
// }
amqpMsg.Properties.CorrelationID = m.CorrelationID
if m.CorrelationID != nil {
amqpMsg.Properties.CorrelationID = *m.CorrelationID
}
amqpMsg.Properties.ContentType = m.ContentType
amqpMsg.Properties.Subject = m.Subject
amqpMsg.Properties.To = m.To
@ -206,11 +202,11 @@ func newReceivedMessage(ctxForLogging context.Context, amqpMsg *amqp.Message) *R
if id, ok := amqpMsg.Properties.MessageID.(string); ok {
msg.MessageID = id
}
msg.SessionID = &amqpMsg.Properties.GroupID
msg.SessionID = amqpMsg.Properties.GroupID
//msg.GroupSequence = &amqpMsg.Properties.GroupSequence
if id, ok := amqpMsg.Properties.CorrelationID.(string); ok {
msg.CorrelationID = id
msg.CorrelationID = &id
}
msg.ContentType = amqpMsg.Properties.ContentType
msg.Subject = amqpMsg.Properties.Subject

Просмотреть файл

@ -19,7 +19,7 @@ func TestMessageBatchUnitTests(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 1, mb.NumMessages())
require.EqualValues(t, 195, mb.NumBytes())
require.EqualValues(t, 183, mb.NumBytes())
})
t.Run("addTooManyMessages", func(t *testing.T) {

Просмотреть файл

@ -20,12 +20,12 @@ func TestMessageUnitTest(t *testing.T) {
// basic thing - it's totally fine to send a message nothing in it.
amqpMessage := message.toAMQPMessage()
require.Empty(t, amqpMessage.Annotations)
require.NotEmpty(t, amqpMessage.Properties.MessageID, "MessageID is (currently) automatically filled out if you don't specify one")
require.Nil(t, amqpMessage.Properties.MessageID)
scheduledEnqueuedTime := time.Now()
message = &Message{
MessageID: "message id",
MessageID: to.StringPtr("message id"),
Body: []byte("the body"),
PartitionKey: to.StringPtr("partition key"),
TransactionPartitionKey: to.StringPtr("via partition key"),
@ -36,7 +36,7 @@ func TestMessageUnitTest(t *testing.T) {
amqpMessage = message.toAMQPMessage()
require.EqualValues(t, "message id", amqpMessage.Properties.MessageID)
require.EqualValues(t, "session id", amqpMessage.Properties.GroupID)
require.EqualValues(t, "session id", *amqpMessage.Properties.GroupID)
require.EqualValues(t, "the body", string(amqpMessage.Data[0]))
require.EqualValues(t, 1, len(amqpMessage.Data))
@ -93,21 +93,23 @@ func TestAMQPMessageToMessage(t *testing.T) {
// test the conversion occurs correctly.
dotNetEncodedLockTokenGUID := []byte{205, 89, 49, 187, 254, 253, 77, 205, 162, 38, 172, 76, 45, 235, 91, 225}
groupSequence := uint32(1)
amqpMsg := &amqp.Message{
DeliveryTag: dotNetEncodedLockTokenGUID,
Properties: &amqp.MessageProperties{
MessageID: "messageID",
To: "to",
Subject: "subject",
ReplyTo: "replyTo",
ReplyToGroupID: "replyToGroupID",
To: to.StringPtr("to"),
Subject: to.StringPtr("subject"),
ReplyTo: to.StringPtr("replyTo"),
ReplyToGroupID: to.StringPtr("replyToGroupID"),
CorrelationID: "correlationID",
ContentType: "contentType",
ContentEncoding: "contentEncoding",
AbsoluteExpiryTime: until,
CreationTime: until,
GroupID: "groupID",
GroupSequence: uint32(1),
ContentType: to.StringPtr("contentType"),
ContentEncoding: to.StringPtr("contentEncoding"),
AbsoluteExpiryTime: &until,
CreationTime: &until,
GroupID: to.StringPtr("groupID"),
GroupSequence: &groupSequence,
},
Annotations: amqp.Annotations{
"x-opt-locked-until": until,
@ -133,9 +135,9 @@ func TestAMQPMessageToMessage(t *testing.T) {
msg := newReceivedMessage(context.Background(), amqpMsg)
require.EqualValues(t, msg.MessageID, amqpMsg.Properties.MessageID, "messageID")
require.EqualValues(t, *msg.SessionID, amqpMsg.Properties.GroupID, "groupID")
require.EqualValues(t, msg.SessionID, amqpMsg.Properties.GroupID, "groupID")
require.EqualValues(t, msg.ContentType, amqpMsg.Properties.ContentType, "contentType")
require.EqualValues(t, msg.CorrelationID, amqpMsg.Properties.CorrelationID, "correlation")
require.EqualValues(t, *msg.CorrelationID, amqpMsg.Properties.CorrelationID, "correlation")
require.EqualValues(t, msg.ReplyToSessionID, amqpMsg.Properties.ReplyToGroupID, "replyToGroupID")
require.EqualValues(t, msg.ReplyTo, amqpMsg.Properties.ReplyTo, "replyTo")
require.EqualValues(t, *msg.TimeToLive, amqpMsg.Header.TTL, "ttl")

Просмотреть файл

@ -14,6 +14,39 @@ import (
"github.com/stretchr/testify/require"
)
func Test_Sender_MessageID(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, &admin.QueueProperties{
EnablePartitioning: to.BoolPtr(true),
})
defer cleanup()
sender, err := client.NewSender(queueName, nil)
require.NoError(t, err)
receiver, err := client.NewReceiverForQueue(queueName, &ReceiverOptions{
ReceiveMode: ReceiveModeReceiveAndDelete,
})
require.NoError(t, err)
err = sender.SendMessage(context.Background(), &Message{
MessageID: to.StringPtr("message with a message ID"),
})
require.NoError(t, err)
messages, err := receiver.ReceiveMessages(context.Background(), 1, nil)
require.NoError(t, err)
require.EqualValues(t, "message with a message ID", messages[0].MessageID)
err = sender.SendMessage(context.Background(), &Message{
// note if you don't explicitly send a message ID one will be auto-generated for you.
})
require.NoError(t, err)
messages, err = receiver.ReceiveMessages(context.Background(), 1, nil)
require.NoError(t, err)
require.NotEmpty(t, messages[0].MessageID) // this is filled in by automatically.
}
func Test_Sender_SendBatchOfTwo(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, nil)
defer cleanup()
@ -110,7 +143,7 @@ func Test_Sender_UsingPartitionedQueue(t *testing.T) {
require.NoError(t, err)
err = sender.SendMessage(context.Background(), &Message{
MessageID: "message ID",
MessageID: to.StringPtr("message ID"),
Body: []byte("1. single partitioned message"),
PartitionKey: to.StringPtr("partitionKey1"),
})

Просмотреть файл

@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
@ -59,9 +60,6 @@ func TestSessionReceiver_acceptSession(t *testing.T) {
}
func TestSessionReceiver_blankSessionIDs(t *testing.T) {
t.Skip("Can't run blank session ID test because of issue")
// errors while closing links: amqp sender close error: *Error{Condition: amqp:not-allowed, Description: The SessionId was not set on a message, and it cannot be sent to the entity. Entities that have session support enabled can only receive messages that have the SessionId set to a valid value.
client, cleanup, queueName := setupLiveTest(t, &admin.QueueProperties{
RequiresSession: to.BoolPtr(true),
})
@ -79,17 +77,44 @@ func TestSessionReceiver_blankSessionIDs(t *testing.T) {
})
require.NoError(t, err)
sequenceNumbers, err := sender.ScheduleMessages(ctx, []*Message{{
Body: []byte("session-based message"),
SessionID: to.StringPtr(""),
}}, time.Now())
require.NoError(t, err)
require.NotEmpty(t, sequenceNumbers)
// start a receiver with the "" session ID
receiver, err := client.AcceptSessionForQueue(ctx, queueName, "", nil)
require.NoError(t, err)
require.EqualValues(t, "", receiver.SessionID())
msg, err := receiver.inner.receiveMessage(ctx, nil)
require.NoError(t, err)
var received []*ReceivedMessage
require.EqualValues(t, "session-based message", msg.Body)
require.EqualValues(t, "", *msg.SessionID)
require.NoError(t, receiver.CompleteMessage(ctx, msg))
receiveCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
require.EqualValues(t, "session-1", receiver.SessionID())
for {
messages, err := receiver.ReceiveMessages(receiveCtx, 2, nil)
require.NoError(t, err)
for _, msg := range messages {
require.NoError(t, receiver.CompleteMessage(ctx, msg))
received = append(received, msg)
}
if len(received) == 2 {
break
}
}
for _, msg := range received {
body, err := msg.Body()
require.NoError(t, err)
require.EqualValues(t, "", *msg.SessionID)
require.EqualValues(t, "session-based message", string(body))
}
}
func TestSessionReceiver_acceptSessionButAlreadyLocked(t *testing.T) {