fixes #106
This commit is contained in:
Родитель
a91c2d6393
Коммит
701f11e1bb
|
@ -79,4 +79,4 @@ func (m *Message) sendDisposition(ctx context.Context, dispositionStatus Message
|
|||
err = fmt.Errorf("unsupported bulk disposition status %q", dispositionStatus)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ func TestBatchDispositionIterator(t *testing.T) {
|
|||
|
||||
func TestBatchDispositionUnsupportedStatus(t *testing.T) {
|
||||
status := MessageStatus(suspendedDisposition)
|
||||
id := uuid.UUID{}
|
||||
id := uuid.UUID{}
|
||||
bdi := BatchDispositionIterator{
|
||||
LockTokenIDs: []*uuid.UUID{
|
||||
&id,
|
||||
|
|
15
errors.go
15
errors.go
|
@ -29,6 +29,11 @@ type (
|
|||
// ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be
|
||||
// more messages in the future.
|
||||
ErrNoMessages struct{}
|
||||
|
||||
// ErrNotFound is returned when an entity is not found (404)
|
||||
ErrNotFound struct{
|
||||
EntityPath string
|
||||
}
|
||||
)
|
||||
|
||||
func (e ErrMissingField) Error() string {
|
||||
|
@ -64,3 +69,13 @@ func (e ErrAMQP) Error() string {
|
|||
func (e ErrNoMessages) Error() string {
|
||||
return "no messages available"
|
||||
}
|
||||
|
||||
func (e ErrNotFound) Error() string {
|
||||
return fmt.Sprintf("entity at %s not found", e.EntityPath)
|
||||
}
|
||||
|
||||
// IsErrNotFound returns true if the error argument is an ErrNotFound type
|
||||
func IsErrNotFound(err error) bool {
|
||||
_, ok := err.(ErrNotFound)
|
||||
return ok
|
||||
}
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
package servicebus
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestErrMissingField_Error(t *testing.T) {
|
||||
|
@ -57,3 +60,12 @@ func TestErrIncorrectType_Error(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestErrNotFound_Error(t *testing.T) {
|
||||
err := ErrNotFound{EntityPath: "/foo/bar"}
|
||||
assert.Equal(t, "entity at /foo/bar not found", err.Error())
|
||||
assert.True(t, IsErrNotFound(err))
|
||||
|
||||
otherErr := errors.New("foo")
|
||||
assert.False(t, IsErrNotFound(otherErr))
|
||||
}
|
||||
|
|
1
go.mod
1
go.mod
|
@ -15,5 +15,6 @@ require (
|
|||
github.com/uber/jaeger-lib v1.5.0 // indirect
|
||||
go.opencensus.io v0.15.0
|
||||
go.uber.org/atomic v1.3.2 // indirect
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
|
||||
pack.ag/amqp v0.10.2
|
||||
)
|
||||
|
|
7
go.sum
7
go.sum
|
@ -40,11 +40,14 @@ go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
|
|||
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
|
||||
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
pack.ag/amqp v0.8.0 h1:JT0f88Hsbo5D+s8bBdleDOHvMDoYcaBW6GplAUqtxC4=
|
||||
pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
pack.ag/amqp v0.10.1 h1:+NUHSIOCRt62A7+RXL/kPOlEeljIdrpte1HNgdhIn8w=
|
||||
pack.ag/amqp v0.10.1/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U=
|
||||
pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNamespaceWithUserAgentOption(t *testing.T) {
|
||||
userAgent := "custom-user-agent"
|
||||
nsUserAgentOption := NamespaceWithUserAgent(userAgent)
|
||||
|
@ -42,4 +43,4 @@ func TestNamespaceWithoutUserAgentOption(t *testing.T) {
|
|||
ns, err := NewNamespace(nsUserAgentOption)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, rootUserAgent, ns.getUserAgent())
|
||||
}
|
||||
}
|
||||
|
|
20
queue.go
20
queue.go
|
@ -640,22 +640,34 @@ func (q *Queue) Close(ctx context.Context) error {
|
|||
if q.receiver != nil {
|
||||
if err := q.receiver.Close(ctx); err != nil {
|
||||
if q.sender != nil {
|
||||
if err := q.sender.Close(ctx); err != nil {
|
||||
if err := q.sender.Close(ctx); err != nil && !isConnectionClosed(err) {
|
||||
log.For(ctx).Error(err)
|
||||
}
|
||||
}
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
|
||||
if !isConnectionClosed(err) {
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if q.sender != nil {
|
||||
return q.sender.Close(ctx)
|
||||
err := q.sender.Close(ctx)
|
||||
if err != nil && !isConnectionClosed(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func isConnectionClosed(err error) bool {
|
||||
return err.Error() == "amqp: connection closed"
|
||||
}
|
||||
|
||||
func (q *Queue) newReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error) {
|
||||
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.NewReceiver")
|
||||
defer span.Finish()
|
||||
|
|
|
@ -345,7 +345,7 @@ func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, err
|
|||
}
|
||||
|
||||
if res.StatusCode == http.StatusNotFound {
|
||||
return nil, nil
|
||||
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(res.Body)
|
||||
|
@ -358,7 +358,7 @@ func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, err
|
|||
err = xml.Unmarshal(b, &entry)
|
||||
if err != nil {
|
||||
if isEmptyFeed(b) {
|
||||
return nil, nil
|
||||
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
|
||||
}
|
||||
return nil, formatManagementError(b)
|
||||
}
|
||||
|
|
|
@ -221,7 +221,17 @@ func (suite *serviceBusSuite) TestQueueUnmarshal() {
|
|||
suite.EqualValues(servicebus.EntityStatusActive, *q.Status)
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestQueueManagementWrites() {
|
||||
func (suite *serviceBusSuite) TestQueueManager_NotFound() {
|
||||
ns := suite.getNewSasInstance()
|
||||
qm := ns.NewQueueManager()
|
||||
entity, err := qm.Get(context.Background(), "somethingNotThere")
|
||||
suite.Nil(entity)
|
||||
suite.Require().NotNil(err)
|
||||
suite.True(IsErrNotFound(err))
|
||||
suite.Equal("entity at /somethingNotThere not found", err.Error())
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestQueueManagement_Writes() {
|
||||
tests := map[string]func(context.Context, *testing.T, *QueueManager, string){
|
||||
"TestPutDefaultQueue": testPutQueue,
|
||||
}
|
||||
|
@ -771,7 +781,7 @@ func (suite *serviceBusSuite) queueMessageTest(
|
|||
func makeQueue(ctx context.Context, t *testing.T, ns *Namespace, name string, opts ...QueueManagementOption) func() {
|
||||
qm := ns.NewQueueManager()
|
||||
entity, err := qm.Get(ctx, name)
|
||||
if !assert.NoError(t, err) {
|
||||
if err != nil && !IsErrNotFound(err) {
|
||||
assert.FailNow(t, "could not GET a queue entity")
|
||||
}
|
||||
|
||||
|
|
|
@ -273,7 +273,11 @@ func (s *Subscription) RenewLocks(ctx context.Context, messages ...*Message) err
|
|||
// Close the underlying connection to Service Bus
|
||||
func (s *Subscription) Close(ctx context.Context) error {
|
||||
if s.receiver != nil {
|
||||
return s.receiver.Close(ctx)
|
||||
err := s.receiver.Close(ctx)
|
||||
if err != nil && !isConnectionClosed(err) {
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -281,7 +281,7 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript
|
|||
}
|
||||
|
||||
if res.StatusCode == http.StatusNotFound {
|
||||
return nil, nil
|
||||
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(res.Body)
|
||||
|
@ -293,7 +293,9 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript
|
|||
err = xml.Unmarshal(b, &entry)
|
||||
if err != nil {
|
||||
if isEmptyFeed(b) {
|
||||
return nil, nil
|
||||
// seems the only way to catch 404 is if the feed is empty. If no subscriptions exist, the GET returns 200
|
||||
// and an empty feed.
|
||||
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
|
||||
}
|
||||
return nil, formatManagementError(b)
|
||||
}
|
||||
|
@ -315,7 +317,7 @@ func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName s
|
|||
}
|
||||
|
||||
if res.StatusCode == http.StatusNotFound {
|
||||
return nil, nil
|
||||
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(res.Body)
|
||||
|
|
|
@ -95,7 +95,7 @@ const (
|
|||
</entry>`
|
||||
)
|
||||
|
||||
func (suite *serviceBusSuite) TestSubscriptionRuleEntryUnmarshal() {
|
||||
func (suite *serviceBusSuite) TestSubscriptionRuleEntry_Unmarshal() {
|
||||
var entry ruleEntry
|
||||
err := xml.Unmarshal([]byte(ruleEntryContent), &entry)
|
||||
suite.NoError(err)
|
||||
|
@ -108,7 +108,7 @@ func (suite *serviceBusSuite) TestSubscriptionRuleEntryUnmarshal() {
|
|||
suite.Equal("TrueFilter", entry.Content.RuleDescription.Filter.Type)
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() {
|
||||
func (suite *serviceBusSuite) TestSubscriptionEntry_Unmarshal() {
|
||||
var entry subscriptionEntry
|
||||
err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry)
|
||||
suite.NoError(err)
|
||||
|
@ -120,7 +120,7 @@ func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() {
|
|||
suite.Equal("PT1M", *entry.Content.SubscriptionDescription.LockDuration)
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestSubscriptionUnmarshal() {
|
||||
func (suite *serviceBusSuite) TestSubscriptionEntity_Unmarshal() {
|
||||
var entry subscriptionEntry
|
||||
err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry)
|
||||
suite.NoError(err)
|
||||
|
@ -135,7 +135,18 @@ func (suite *serviceBusSuite) TestSubscriptionUnmarshal() {
|
|||
suite.EqualValues(servicebus.EntityStatusActive, *s.Status)
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestSubscriptionManagementWrites() {
|
||||
func (suite *serviceBusSuite) TestSubscriptionManager_NotFound() {
|
||||
ns := suite.getNewSasInstance()
|
||||
sm, err := ns.NewSubscriptionManager("foo")
|
||||
suite.Require().NoError(err)
|
||||
subEntity, err := sm.Get(context.Background(), "bar")
|
||||
suite.Nil(subEntity)
|
||||
suite.Require().NotNil(err)
|
||||
suite.True(IsErrNotFound(err))
|
||||
suite.Equal("entity at /foo/subscriptions/bar not found", err.Error())
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestSubscriptionManagement_Writes() {
|
||||
tests := map[string]func(context.Context, *testing.T, *SubscriptionManager, string){
|
||||
"TestPutDefaultSubscription": testPutSubscription,
|
||||
}
|
||||
|
@ -629,7 +640,7 @@ func (suite *serviceBusSuite) subscriptionMessageTest(tests map[string]func(cont
|
|||
func makeSubscription(ctx context.Context, t *testing.T, topic *Topic, name string, opts ...SubscriptionManagementOption) func() {
|
||||
sm := topic.NewSubscriptionManager()
|
||||
entity, err := sm.Get(ctx, name)
|
||||
if !assert.NoError(t, err) {
|
||||
if assert.Error(t, err) && !IsErrNotFound(err) {
|
||||
assert.FailNow(t, "could not GET a subscription")
|
||||
}
|
||||
|
||||
|
|
6
topic.go
6
topic.go
|
@ -127,7 +127,11 @@ func (t *Topic) Close(ctx context.Context) error {
|
|||
defer span.Finish()
|
||||
|
||||
if t.sender != nil {
|
||||
return t.sender.Close(ctx)
|
||||
err := t.sender.Close(ctx)
|
||||
if err != nil && !isConnectionClosed(err) {
|
||||
log.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -167,7 +167,7 @@ func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, err
|
|||
}
|
||||
|
||||
if res.StatusCode == http.StatusNotFound {
|
||||
return nil, nil
|
||||
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(res.Body)
|
||||
|
@ -180,7 +180,7 @@ func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, err
|
|||
err = xml.Unmarshal(b, &entry)
|
||||
if err != nil {
|
||||
if isEmptyFeed(b) {
|
||||
return nil, nil
|
||||
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
|
||||
}
|
||||
return nil, formatManagementError(b)
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ const (
|
|||
</entry>`
|
||||
)
|
||||
|
||||
func (suite *serviceBusSuite) TestTopicEntryUnmarshal() {
|
||||
func (suite *serviceBusSuite) TestTopicEntry_Unmarshal() {
|
||||
var entry topicEntry
|
||||
err := xml.Unmarshal([]byte(topicEntry1), &entry)
|
||||
suite.Nil(err)
|
||||
|
@ -88,7 +88,7 @@ func (suite *serviceBusSuite) TestTopicEntryUnmarshal() {
|
|||
suite.NotNil(entry.Content)
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestTopicUnmarshal() {
|
||||
func (suite *serviceBusSuite) TestTopicEntryAndDescription_Unmarshal() {
|
||||
var entry atom.Entry
|
||||
err := xml.Unmarshal([]byte(topicEntry1), &entry)
|
||||
suite.Nil(err)
|
||||
|
@ -107,7 +107,17 @@ func (suite *serviceBusSuite) TestTopicUnmarshal() {
|
|||
suite.EqualValues(servicebus.EntityStatusActive, *td.Status)
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestTopicManagementWrites() {
|
||||
func (suite *serviceBusSuite) TestTopicManager_NotFound() {
|
||||
ns := suite.getNewSasInstance()
|
||||
tm := ns.NewTopicManager()
|
||||
subEntity, err := tm.Get(context.Background(), "bar")
|
||||
suite.Nil(subEntity)
|
||||
suite.Require().NotNil(err)
|
||||
suite.True(IsErrNotFound(err))
|
||||
suite.Equal("entity at /bar not found", err.Error())
|
||||
}
|
||||
|
||||
func (suite *serviceBusSuite) TestTopicManagement_Writes() {
|
||||
tests := map[string]func(context.Context, *testing.T, *TopicManager, string){
|
||||
"TestPutDefaultTopic": testPutTopic,
|
||||
}
|
||||
|
@ -319,7 +329,7 @@ func testTopicSend(ctx context.Context, t *testing.T, topic *Topic) {
|
|||
func makeTopic(ctx context.Context, t *testing.T, ns *Namespace, name string, opts ...TopicManagementOption) func() {
|
||||
tm := ns.NewTopicManager()
|
||||
entity, err := tm.Get(ctx, name)
|
||||
if !assert.NoError(t, err) {
|
||||
if err != nil && !IsErrNotFound(err) {
|
||||
assert.FailNow(t, "could not GET a subscription")
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче