Merge pull request #107 from Azure/not-found

Add support for NotFound error in mgmt API
This commit is contained in:
David Justice 2019-03-12 17:06:11 -07:00 коммит произвёл GitHub
Родитель a91c2d6393 e4fe45999d
Коммит 3088c983e6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
20 изменённых файлов: 124 добавлений и 32 удалений

Просмотреть файл

@ -1,8 +1,8 @@
language: go
sudo: false
go:
- 1.11.x
- 1.x
- 1.11.x
before_install:
- cd ${TRAVIS_HOME}

Просмотреть файл

@ -79,4 +79,4 @@ func (m *Message) sendDisposition(ctx context.Context, dispositionStatus Message
err = fmt.Errorf("unsupported bulk disposition status %q", dispositionStatus)
}
return err
}
}

Просмотреть файл

@ -33,7 +33,7 @@ func TestBatchDispositionIterator(t *testing.T) {
func TestBatchDispositionUnsupportedStatus(t *testing.T) {
status := MessageStatus(suspendedDisposition)
id := uuid.UUID{}
id := uuid.UUID{}
bdi := BatchDispositionIterator{
LockTokenIDs: []*uuid.UUID{
&id,

Просмотреть файл

@ -1,6 +1,13 @@
# Change Log
## `head`
## `v0.3.0`
- Add disposition batching
- Add NotFound errors for mgmt API
- Fix go routine leak when listening for messages upon context close
## `v0.2.0`
- Refactor disposition handler so that errors can be handled in handlers
- Add dead letter queues for entities
- Fix connection leaks when using multiple calls to Receive

Просмотреть файл

@ -29,6 +29,11 @@ type (
// ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be
// more messages in the future.
ErrNoMessages struct{}
// ErrNotFound is returned when an entity is not found (404)
ErrNotFound struct {
EntityPath string
}
)
func (e ErrMissingField) Error() string {
@ -64,3 +69,13 @@ func (e ErrAMQP) Error() string {
func (e ErrNoMessages) Error() string {
return "no messages available"
}
func (e ErrNotFound) Error() string {
return fmt.Sprintf("entity at %s not found", e.EntityPath)
}
// IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsErrNotFound(err error) bool {
_, ok := err.(ErrNotFound)
return ok
}

Просмотреть файл

@ -1,9 +1,12 @@
package servicebus
import (
"errors"
"fmt"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
)
func TestErrMissingField_Error(t *testing.T) {
@ -57,3 +60,12 @@ func TestErrIncorrectType_Error(t *testing.T) {
})
}
}
func TestErrNotFound_Error(t *testing.T) {
err := ErrNotFound{EntityPath: "/foo/bar"}
assert.Equal(t, "entity at /foo/bar not found", err.Error())
assert.True(t, IsErrNotFound(err))
otherErr := errors.New("foo")
assert.False(t, IsErrNotFound(otherErr))
}

1
go.mod
Просмотреть файл

@ -15,5 +15,6 @@ require (
github.com/uber/jaeger-lib v1.5.0 // indirect
go.opencensus.io v0.15.0
go.uber.org/atomic v1.3.2 // indirect
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
pack.ag/amqp v0.10.2
)

7
go.sum
Просмотреть файл

@ -40,11 +40,14 @@ go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
pack.ag/amqp v0.8.0 h1:JT0f88Hsbo5D+s8bBdleDOHvMDoYcaBW6GplAUqtxC4=
pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.10.1 h1:+NUHSIOCRt62A7+RXL/kPOlEeljIdrpte1HNgdhIn8w=
pack.ag/amqp v0.10.1/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U=
pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=

Просмотреть файл

@ -45,7 +45,7 @@ const (
//`
// Version is the semantic version number
Version = "0.2.0"
Version = "0.3.0"
rootUserAgent = "/golang-service-bus"
)

Просмотреть файл

@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestNamespaceWithUserAgentOption(t *testing.T) {
userAgent := "custom-user-agent"
nsUserAgentOption := NamespaceWithUserAgent(userAgent)
@ -42,4 +43,4 @@ func TestNamespaceWithoutUserAgentOption(t *testing.T) {
ns, err := NewNamespace(nsUserAgentOption)
assert.Nil(t, err)
assert.Equal(t, rootUserAgent, ns.getUserAgent())
}
}

Просмотреть файл

@ -640,22 +640,34 @@ func (q *Queue) Close(ctx context.Context) error {
if q.receiver != nil {
if err := q.receiver.Close(ctx); err != nil {
if q.sender != nil {
if err := q.sender.Close(ctx); err != nil {
if err := q.sender.Close(ctx); err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
}
}
log.For(ctx).Error(err)
return err
if !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}
return nil
}
}
if q.sender != nil {
return q.sender.Close(ctx)
err := q.sender.Close(ctx)
if err != nil && !isConnectionClosed(err) {
return err
}
}
return nil
}
func isConnectionClosed(err error) bool {
return err.Error() == "amqp: connection closed"
}
func (q *Queue) newReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error) {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.NewReceiver")
defer span.Finish()

Просмотреть файл

@ -39,7 +39,7 @@ func ExampleQueue_getOrBuildQueue() {
qm := ns.NewQueueManager()
qe, err := qm.Get(ctx, queueName)
if err != nil {
if err != nil && !servicebus.IsErrNotFound(err) {
fmt.Println(err)
return
}

Просмотреть файл

@ -345,7 +345,7 @@ func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, err
}
if res.StatusCode == http.StatusNotFound {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
b, err := ioutil.ReadAll(res.Body)
@ -358,7 +358,7 @@ func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, err
err = xml.Unmarshal(b, &entry)
if err != nil {
if isEmptyFeed(b) {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
return nil, formatManagementError(b)
}

Просмотреть файл

@ -221,7 +221,17 @@ func (suite *serviceBusSuite) TestQueueUnmarshal() {
suite.EqualValues(servicebus.EntityStatusActive, *q.Status)
}
func (suite *serviceBusSuite) TestQueueManagementWrites() {
func (suite *serviceBusSuite) TestQueueManager_NotFound() {
ns := suite.getNewSasInstance()
qm := ns.NewQueueManager()
entity, err := qm.Get(context.Background(), "somethingNotThere")
suite.Nil(entity)
suite.Require().NotNil(err)
suite.True(IsErrNotFound(err))
suite.Equal("entity at /somethingNotThere not found", err.Error())
}
func (suite *serviceBusSuite) TestQueueManagement_Writes() {
tests := map[string]func(context.Context, *testing.T, *QueueManager, string){
"TestPutDefaultQueue": testPutQueue,
}
@ -771,7 +781,7 @@ func (suite *serviceBusSuite) queueMessageTest(
func makeQueue(ctx context.Context, t *testing.T, ns *Namespace, name string, opts ...QueueManagementOption) func() {
qm := ns.NewQueueManager()
entity, err := qm.Get(ctx, name)
if !assert.NoError(t, err) {
if err != nil && !IsErrNotFound(err) {
assert.FailNow(t, "could not GET a queue entity")
}

Просмотреть файл

@ -273,7 +273,11 @@ func (s *Subscription) RenewLocks(ctx context.Context, messages ...*Message) err
// Close the underlying connection to Service Bus
func (s *Subscription) Close(ctx context.Context) error {
if s.receiver != nil {
return s.receiver.Close(ctx)
err := s.receiver.Close(ctx)
if err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}
}
return nil
}

Просмотреть файл

@ -281,7 +281,7 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript
}
if res.StatusCode == http.StatusNotFound {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
b, err := ioutil.ReadAll(res.Body)
@ -293,7 +293,9 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript
err = xml.Unmarshal(b, &entry)
if err != nil {
if isEmptyFeed(b) {
return nil, nil
// seems the only way to catch 404 is if the feed is empty. If no subscriptions exist, the GET returns 200
// and an empty feed.
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
return nil, formatManagementError(b)
}
@ -315,7 +317,7 @@ func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName s
}
if res.StatusCode == http.StatusNotFound {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
b, err := ioutil.ReadAll(res.Body)

Просмотреть файл

@ -95,7 +95,7 @@ const (
</entry>`
)
func (suite *serviceBusSuite) TestSubscriptionRuleEntryUnmarshal() {
func (suite *serviceBusSuite) TestSubscriptionRuleEntry_Unmarshal() {
var entry ruleEntry
err := xml.Unmarshal([]byte(ruleEntryContent), &entry)
suite.NoError(err)
@ -108,7 +108,7 @@ func (suite *serviceBusSuite) TestSubscriptionRuleEntryUnmarshal() {
suite.Equal("TrueFilter", entry.Content.RuleDescription.Filter.Type)
}
func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() {
func (suite *serviceBusSuite) TestSubscriptionEntry_Unmarshal() {
var entry subscriptionEntry
err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry)
suite.NoError(err)
@ -120,7 +120,7 @@ func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() {
suite.Equal("PT1M", *entry.Content.SubscriptionDescription.LockDuration)
}
func (suite *serviceBusSuite) TestSubscriptionUnmarshal() {
func (suite *serviceBusSuite) TestSubscriptionEntity_Unmarshal() {
var entry subscriptionEntry
err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry)
suite.NoError(err)
@ -135,7 +135,18 @@ func (suite *serviceBusSuite) TestSubscriptionUnmarshal() {
suite.EqualValues(servicebus.EntityStatusActive, *s.Status)
}
func (suite *serviceBusSuite) TestSubscriptionManagementWrites() {
func (suite *serviceBusSuite) TestSubscriptionManager_NotFound() {
ns := suite.getNewSasInstance()
sm, err := ns.NewSubscriptionManager("foo")
suite.Require().NoError(err)
subEntity, err := sm.Get(context.Background(), "bar")
suite.Nil(subEntity)
suite.Require().NotNil(err)
suite.True(IsErrNotFound(err))
suite.Equal("entity at /foo/subscriptions/bar not found", err.Error())
}
func (suite *serviceBusSuite) TestSubscriptionManagement_Writes() {
tests := map[string]func(context.Context, *testing.T, *SubscriptionManager, string){
"TestPutDefaultSubscription": testPutSubscription,
}
@ -629,7 +640,7 @@ func (suite *serviceBusSuite) subscriptionMessageTest(tests map[string]func(cont
func makeSubscription(ctx context.Context, t *testing.T, topic *Topic, name string, opts ...SubscriptionManagementOption) func() {
sm := topic.NewSubscriptionManager()
entity, err := sm.Get(ctx, name)
if !assert.NoError(t, err) {
if assert.Error(t, err) && !IsErrNotFound(err) {
assert.FailNow(t, "could not GET a subscription")
}

Просмотреть файл

@ -127,7 +127,11 @@ func (t *Topic) Close(ctx context.Context) error {
defer span.Finish()
if t.sender != nil {
return t.sender.Close(ctx)
err := t.sender.Close(ctx)
if err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}
}
return nil

Просмотреть файл

@ -167,7 +167,7 @@ func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, err
}
if res.StatusCode == http.StatusNotFound {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
b, err := ioutil.ReadAll(res.Body)
@ -180,7 +180,7 @@ func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, err
err = xml.Unmarshal(b, &entry)
if err != nil {
if isEmptyFeed(b) {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
return nil, formatManagementError(b)
}

Просмотреть файл

@ -76,7 +76,7 @@ const (
</entry>`
)
func (suite *serviceBusSuite) TestTopicEntryUnmarshal() {
func (suite *serviceBusSuite) TestTopicEntry_Unmarshal() {
var entry topicEntry
err := xml.Unmarshal([]byte(topicEntry1), &entry)
suite.Nil(err)
@ -88,7 +88,7 @@ func (suite *serviceBusSuite) TestTopicEntryUnmarshal() {
suite.NotNil(entry.Content)
}
func (suite *serviceBusSuite) TestTopicUnmarshal() {
func (suite *serviceBusSuite) TestTopicEntryAndDescription_Unmarshal() {
var entry atom.Entry
err := xml.Unmarshal([]byte(topicEntry1), &entry)
suite.Nil(err)
@ -107,7 +107,17 @@ func (suite *serviceBusSuite) TestTopicUnmarshal() {
suite.EqualValues(servicebus.EntityStatusActive, *td.Status)
}
func (suite *serviceBusSuite) TestTopicManagementWrites() {
func (suite *serviceBusSuite) TestTopicManager_NotFound() {
ns := suite.getNewSasInstance()
tm := ns.NewTopicManager()
subEntity, err := tm.Get(context.Background(), "bar")
suite.Nil(subEntity)
suite.Require().NotNil(err)
suite.True(IsErrNotFound(err))
suite.Equal("entity at /bar not found", err.Error())
}
func (suite *serviceBusSuite) TestTopicManagement_Writes() {
tests := map[string]func(context.Context, *testing.T, *TopicManager, string){
"TestPutDefaultTopic": testPutTopic,
}
@ -319,7 +329,7 @@ func testTopicSend(ctx context.Context, t *testing.T, topic *Topic) {
func makeTopic(ctx context.Context, t *testing.T, ns *Namespace, name string, opts ...TopicManagementOption) func() {
tm := ns.NewTopicManager()
entity, err := tm.Get(ctx, name)
if !assert.NoError(t, err) {
if err != nil && !IsErrNotFound(err) {
assert.FailNow(t, "could not GET a subscription")
}