bump amqp version and ensure sender is using unsettled

This commit is contained in:
David Justice 2019-04-17 12:08:45 -07:00
Родитель 783ca54fd1
Коммит bf33d82c5a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
7 изменённых файлов: 120 добавлений и 10 удалений

2
go.mod
Просмотреть файл

@ -14,5 +14,5 @@ require (
github.com/sirupsen/logrus v1.1.1
github.com/stretchr/testify v1.2.2
go.opencensus.io v0.18.0
pack.ag/amqp v0.10.2
pack.ag/amqp v0.11.0
)

4
go.sum
Просмотреть файл

@ -73,5 +73,5 @@ gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbK
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U=
pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.11.0 h1:ot/IA0enDkt4/c8xfbCO7AZzjM4bHys/UffnFmnHUnU=
pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=

25
hub.go
Просмотреть файл

@ -528,18 +528,29 @@ func (h *Hub) Close(ctx context.Context) error {
if h.sender != nil {
if err := h.sender.Close(ctx); err != nil {
log.For(ctx).Error(err)
if rErr := h.closeReceivers(ctx); rErr != nil {
log.For(ctx).Error(rErr)
if !isConnectionClosed(rErr) {
log.For(ctx).Error(rErr)
}
}
// return originating error
return err
if !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}
return nil
}
}
// close receivers and return error
return h.closeReceivers(ctx)
err := h.closeReceivers(ctx)
if err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}
return nil
}
// closeReceivers will close the receivers on the hub and return the last error
@ -696,3 +707,7 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
}
return h.sender, nil
}
func isConnectionClosed(err error) bool {
return err.Error() == "amqp: connection closed"
}

90
hub_examples_test.go Normal file
Просмотреть файл

@ -0,0 +1,90 @@
package eventhub_test
import (
"context"
"fmt"
"os"
"time"
"github.com/joho/godotenv"
"github.com/Azure/azure-event-hubs-go"
)
func init() {
if err := godotenv.Load(); err != nil {
fmt.Println("FATAL: ", err)
}
}
func ExampleHub_helloWorld(){
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()
connStr := os.Getenv("EVENTHUB_CONNECTION_STRING")
if connStr == "" {
fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set")
return
}
hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr)
if err != nil {
fmt.Println(err)
return
}
hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld")
if err != nil {
fmt.Println(err)
return
}
// Create a client to communicate with EventHub
hub, err := eventhub.NewHubFromConnectionString(connStr + ";EntityPath=" + hubEntity.Name)
if err != nil {
fmt.Println(err)
return
}
err = hub.Send(ctx, eventhub.NewEventFromString("Hello World!"))
if err != nil {
fmt.Println(err)
return
}
exit := make(chan struct{})
handler := func(ctx context.Context, event *eventhub.Event) error {
text := string(event.Data)
fmt.Println(text)
exit <- struct{}{}
return nil
}
for _, partitionID := range *hubEntity.PartitionIDs {
_, err = hub.Receive(ctx, partitionID, handler)
}
// wait for the first handler to get called with "Hello World!"
<-exit
err = hub.Close(ctx)
if err != nil {
fmt.Println(err)
return
}
// Output: Hello World!
}
func ensureHub(ctx context.Context, em *eventhub.HubManager, name string, opts ...eventhub.HubManagementOption) (*eventhub.HubEntity, error) {
he, err := em.Get(ctx, name)
if err == nil {
_ = em.Delete(ctx, name)
}
he, err = em.Put(ctx, name, opts...)
if err != nil {
fmt.Println(err)
return nil, err
}
return he, nil
}

Просмотреть файл

@ -133,6 +133,7 @@ func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) {
suite.Require().NotNil(model)
suite.Require().NotNil(model.PartitionIds)
suite.Require().Len(*model.PartitionIds, 4)
time.Sleep(250 * time.Millisecond) // introduce a bit of a delay before using the hub
return model, func() {
if model != nil {
suite.DeleteEventHub(*model.Name)

Просмотреть файл

@ -233,7 +233,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
}
amqpSender, err := amqpSession.NewSender(
amqp.LinkReceiverSettle(amqp.ModeSecond),
amqp.LinkSenderSettle(amqp.ModeUnsettled),
amqp.LinkTargetAddress(s.getAddress()),
)
if err != nil {

Просмотреть файл

@ -194,7 +194,11 @@ func (ts *testSuite) newTestContainerByName(containerName string) func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil {
ts.NoError(err, res)
msg := "error deleting container url"
if res != nil {
msg = fmt.Sprintf("status code: %q; error code: %q", res.StatusCode(), res.ErrorCode())
}
ts.NoError(err, msg)
}
}
}