move everything over to tab for tracing abstraction

This commit is contained in:
David Justice 2019-05-29 16:02:52 -07:00
Родитель d3d1b70a11
Коммит 179ca7d618
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
21 изменённых файлов: 662 добавлений и 246 удалений

Просмотреть файл

@ -28,8 +28,8 @@ import (
"time"
"github.com/Azure/azure-amqp-common-go/rpc"
"github.com/devigned/tab"
"github.com/mitchellh/mapstructure"
"go.opencensus.io/trace"
"pack.ag/amqp"
)
@ -83,7 +83,7 @@ func newClient(namespace *namespace, hubName string) *client {
// GetHubRuntimeInformation requests runtime information for an Event Hub
func (c *client) GetHubRuntimeInformation(ctx context.Context, conn *amqp.Client) (*HubRuntimeInformation, error) {
ctx, span := trace.StartSpan(ctx, "eh.mgmt.client.GetHubRuntimeInformation")
ctx, span := tab.StartSpan(ctx, "eh.mgmt.client.GetHubRuntimeInformation")
defer span.End()
rpcLink, err := rpc.NewLink(conn, address)
@ -117,7 +117,7 @@ func (c *client) GetHubRuntimeInformation(ctx context.Context, conn *amqp.Client
// GetHubPartitionRuntimeInformation fetches runtime information from the AMQP management node for a given partition
func (c *client) GetHubPartitionRuntimeInformation(ctx context.Context, conn *amqp.Client, partitionID string) (*HubPartitionRuntimeInformation, error) {
ctx, span := trace.StartSpan(ctx, "eh.mgmt.client.GetHubPartitionRuntimeInformation")
ctx, span := tab.StartSpan(ctx, "eh.mgmt.client.GetHubPartitionRuntimeInformation")
defer span.End()
rpcLink, err := rpc.NewLink(conn, address)

Просмотреть файл

@ -26,7 +26,7 @@ import (
"context"
"io"
"github.com/Azure/azure-amqp-common-go/persist"
"github.com/Azure/azure-event-hubs-go/persist"
)
type (

Просмотреть файл

@ -35,13 +35,13 @@ import (
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-amqp-common-go/conn"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/persist"
"github.com/Azure/azure-amqp-common-go/sas"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/persist"
"github.com/Azure/go-autorest/autorest/azure"
"go.opencensus.io/trace"
"github.com/devigned/tab"
)
const (
@ -125,19 +125,19 @@ func NewFromConnectionString(ctx context.Context, connStr string, leaser Leaser,
hostName, err := uuid.NewV4()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
parsed, err := conn.ParsedConnectionFromStr(connStr)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
tokenProvider, err := sas.NewTokenProvider(sas.TokenProviderWithKey(parsed.KeyName, parsed.Key))
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -167,13 +167,13 @@ func NewFromConnectionString(ctx context.Context, connStr string, leaser Leaser,
client, err := eventhub.NewHubFromConnectionString(connStr, hubOpts...)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
runtimeInfo, err := client.GetRuntimeInformation(ctx)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -276,7 +276,7 @@ func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID
if len(h.handlers) == 0 {
if err := h.Close(ctx); err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
}
}
@ -300,8 +300,8 @@ func (h *EventProcessorHost) Start(ctx context.Context) error {
}
go func() {
span := trace.FromContext(ctx)
ctx := trace.NewContext(context.Background(), span)
span := tab.FromContext(ctx)
ctx := tab.NewContext(context.Background(), span)
h.scheduler.Run(ctx)
}()
@ -326,8 +326,8 @@ func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error {
}
go func() {
span := trace.FromContext(ctx)
ctx := trace.NewContext(context.Background(), span)
span := tab.FromContext(ctx)
ctx := tab.NewContext(context.Background(), span)
h.scheduler.Run(ctx)
}()
@ -416,7 +416,7 @@ func (h *EventProcessorHost) compositeHandlers() eventhub.Handler {
wg.Add(1)
go func(boundHandle eventhub.Handler) {
if err := boundHandle(ctx, event); err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
wg.Done()
}(handle)
@ -438,9 +438,9 @@ func (c checkpointPersister) Read(namespace, name, consumerGroup, partitionID st
return c.checkpointer.EnsureCheckpoint(ctx, partitionID)
}
func startConsumerSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
ctx, span := trace.StartSpan(ctx, operationName, opts...)
func startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
ctx, span := tab.StartSpan(ctx, operationName)
eventhub.ApplyComponentInfo(span)
span.AddAttributes(trace.StringAttribute("span.kind", "consumer"))
span.AddAttributes(tab.StringAttribute("span.kind", "consumer"))
return span, ctx
}

Просмотреть файл

@ -29,9 +29,9 @@ import (
"math/rand"
"time"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/devigned/tab"
"github.com/Azure/azure-event-hubs-go"
"go.opencensus.io/trace"
)
type (
@ -102,7 +102,7 @@ func (lr *leasedReceiver) listenForClose() {
defer span.End()
err := lr.processor.scheduler.stopReceiver(ctx, lr.lease)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
}()
}
@ -120,7 +120,7 @@ func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) {
time.Sleep(DefaultLeaseRenewalInterval + skew)
err := lr.tryRenew(ctx)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
_ = lr.processor.scheduler.stopReceiver(ctx, lr.lease)
}
}
@ -133,12 +133,12 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error {
lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease.GetPartitionID())
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
if !ok {
err = errors.New("can't renew lease")
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
lr.dlog(ctx, "lease renewed")
@ -150,15 +150,15 @@ func (lr *leasedReceiver) dlog(ctx context.Context, msg string) {
name := lr.processor.name
partitionID := lr.lease.GetPartitionID()
epoch := lr.lease.GetEpoch()
log.For(ctx).Debug(fmt.Sprintf("eph %q, partition %q, epoch %d: "+msg, name, partitionID, epoch))
tab.For(ctx).Debug(fmt.Sprintf("eph %q, partition %q, epoch %d: "+msg, name, partitionID, epoch))
}
func (lr *leasedReceiver) startConsumerSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
span, ctx := startConsumerSpanFromContext(ctx, operationName, opts...)
func (lr *leasedReceiver) startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
span, ctx := startConsumerSpanFromContext(ctx, operationName)
span.AddAttributes(
trace.StringAttribute("eph.id", lr.processor.name),
trace.StringAttribute(partitionIDTag, lr.lease.GetPartitionID()),
trace.Int64Attribute(epochTag, lr.lease.GetEpoch()),
tab.StringAttribute("eph.id", lr.processor.name),
tab.StringAttribute(partitionIDTag, lr.lease.GetPartitionID()),
tab.Int64Attribute(epochTag, lr.lease.GetEpoch()),
)
return span, ctx
}

Просмотреть файл

@ -28,9 +28,10 @@ import (
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/persist"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/devigned/tab"
"github.com/Azure/azure-event-hubs-go/persist"
)
type (
@ -285,7 +286,7 @@ func (ml *memoryLeaserCheckpointer) AcquireLease(ctx context.Context, partitionI
lease.leaser = ml
uuidToken, err := uuid.NewV4()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, err
}

Просмотреть файл

@ -29,8 +29,7 @@ import (
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/log"
"go.opencensus.io/trace"
"github.com/devigned/tab"
)
var (
@ -103,7 +102,7 @@ func (s *scheduler) scan(ctx context.Context) {
allLeases, err := s.processor.leaser.GetLeases(leaseCtx)
cancel()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return
}
@ -118,7 +117,7 @@ func (s *scheduler) scan(ctx context.Context) {
acquired, notAcquired, err := s.acquireExpiredLeases(ctx, allLeases)
s.dlog(ctx, fmt.Sprintf("acquired: %v, not acquired: %v", acquired, notAcquired))
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return
}
@ -126,7 +125,7 @@ func (s *scheduler) scan(ctx context.Context) {
for _, lease := range acquired {
if err := s.startReceiver(ctx, lease); err != nil {
_, _ = s.processor.leaser.ReleaseLease(ctx, lease.GetPartitionID())
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return
}
}
@ -160,14 +159,14 @@ func (s *scheduler) scan(ctx context.Context) {
cancel()
switch {
case err != nil:
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
case !ok:
s.dlog(ctx, fmt.Sprintf("failed to steal: %v", candidate))
default:
s.dlog(ctx, fmt.Sprintf("stole: %v", stolen))
if err := s.startReceiver(ctx, stolen); err != nil {
_, _ = s.processor.leaser.ReleaseLease(acquireCtx, candidate.GetPartitionID())
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return
}
}
@ -221,18 +220,18 @@ func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error
if receiver, ok := s.receivers[lease.GetPartitionID()]; ok {
// receiver thinks it's already running... this is probably a bug if it happens
if err := receiver.Close(ctx); err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
delete(s.receivers, lease.GetPartitionID())
}
span.AddAttributes(
trace.StringAttribute(partitionIDTag, lease.GetPartitionID()),
trace.Int64Attribute(epochTag, lease.GetEpoch()),
tab.StringAttribute(partitionIDTag, lease.GetPartitionID()),
tab.Int64Attribute(epochTag, lease.GetEpoch()),
)
lr := newLeasedReceiver(s.processor, lease)
if err := lr.Run(ctx); err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
s.receivers[lease.GetPartitionID()] = lr
@ -247,8 +246,8 @@ func (s *scheduler) stopReceiver(ctx context.Context, lease LeaseMarker) error {
defer span.End()
span.AddAttributes(
trace.StringAttribute(partitionIDTag, lease.GetPartitionID()),
trace.Int64Attribute(epochTag, lease.GetEpoch()),
tab.StringAttribute(partitionIDTag, lease.GetPartitionID()),
tab.Int64Attribute(epochTag, lease.GetEpoch()),
)
s.dlog(ctx, fmt.Sprintf("stopping receiver for partitionID %q", lease.GetPartitionID()))
if receiver, ok := s.receivers[lease.GetPartitionID()]; ok {
@ -257,7 +256,7 @@ func (s *scheduler) stopReceiver(ctx context.Context, lease LeaseMarker) error {
err := receiver.Close(ctx)
delete(s.receivers, lease.GetPartitionID())
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
}
@ -292,7 +291,7 @@ func (s *scheduler) acquireExpiredLeases(ctx context.Context, leases []LeaseMark
func (s *scheduler) dlog(ctx context.Context, msg string) {
name := s.processor.name
log.For(ctx).Debug(fmt.Sprintf("eph %q: "+msg, name))
tab.For(ctx).Debug(fmt.Sprintf("eph %q: "+msg, name))
}
func (s *scheduler) leaseToSteal(ctx context.Context, candidates []LeaseMarker, myLeaseCount int) (LeaseMarker, bool) {
@ -302,7 +301,7 @@ func (s *scheduler) leaseToSteal(ctx context.Context, candidates []LeaseMarker,
biggestOwner := ownerWithMostLeases(candidates)
if biggestOwner != nil && s.processor.GetName() != biggestOwner.Owner {
leasesByOwner := leasesByOwner(candidates)
log.For(ctx).Debug(fmt.Sprintf("i am %v, the biggest owner is %v and leases by owner: %v", s.processor.GetName(), biggestOwner.Owner, leasesByOwner))
tab.For(ctx).Debug(fmt.Sprintf("i am %v, the biggest owner is %v and leases by owner: %v", s.processor.GetName(), biggestOwner.Owner, leasesByOwner))
if leasesByOwner[biggestOwner.Owner] != nil &&
(len(biggestOwner.Leases)-myLeaseCount) >= 2 && len(leasesByOwner[biggestOwner.Owner]) >= 1 {
selection := rand.Intn(len(leasesByOwner[biggestOwner.Owner]))
@ -337,8 +336,8 @@ func leasesByOwner(candidates []LeaseMarker) map[string][]LeaseMarker {
return byOwner
}
func (s *scheduler) startConsumerSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
span, ctx := startConsumerSpanFromContext(ctx, operationName, opts...)
span.AddAttributes(trace.StringAttribute("eph.id", s.processor.name))
func (s *scheduler) startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
span, ctx := startConsumerSpanFromContext(ctx, operationName)
span.AddAttributes(tab.StringAttribute("eph.id", s.processor.name))
return span, ctx
}

Просмотреть файл

@ -28,9 +28,10 @@ import (
"strings"
"time"
"github.com/Azure/azure-amqp-common-go/persist"
"github.com/mitchellh/mapstructure"
"pack.ag/amqp"
"github.com/Azure/azure-event-hubs-go/persist"
)
const (
@ -113,7 +114,12 @@ func (e *Event) GetCheckpoint() persist.Checkpoint {
return persist.NewCheckpoint(offset, sequenceNumber, enqueueTime)
}
// Set will set a key in the event properties
// GetKeyValues implements tab.Carrier
func (e *Event) GetKeyValues() map[string]interface{} {
return e.Properties
}
// Set implements tab.Carrier
func (e *Event) Set(key string, value interface{}) {
if e.Properties == nil {
e.Properties = make(map[string]interface{})

34
go.mod
Просмотреть файл

@ -1,19 +1,31 @@
module github.com/Azure/azure-event-hubs-go
require (
github.com/Azure/azure-amqp-common-go v1.1.4
github.com/Azure/azure-pipeline-go v0.1.8
github.com/Azure/azure-sdk-for-go v21.3.0+incompatible
github.com/Azure/azure-storage-blob-go v0.0.0-20181023070848-cf01652132cc
github.com/Azure/go-autorest v11.1.1+incompatible
github.com/dimchansky/utfbom v1.0.0 // indirect
github.com/Azure/azure-amqp-common-go v1.1.5-0.20190529222532-b92ea57d4550
github.com/Azure/azure-pipeline-go v0.1.9
github.com/Azure/azure-sdk-for-go v30.0.0+incompatible
github.com/Azure/azure-storage-blob-go v0.6.0
github.com/Azure/go-autorest v12.0.0+incompatible
github.com/devigned/tab v0.1.1
github.com/devigned/tab/opencensus v0.1.1
github.com/dimchansky/utfbom v1.1.0 // indirect
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/google/go-cmp v0.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/joho/godotenv v1.3.0
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7
github.com/mitchellh/go-homedir v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/sirupsen/logrus v1.1.1
github.com/stretchr/testify v1.2.2
go.opencensus.io v0.18.0
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519
github.com/pkg/errors v0.8.1 // indirect
github.com/sirupsen/logrus v1.2.0
github.com/stretchr/testify v1.3.0
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f // indirect
golang.org/x/net v0.0.0-20190522155817-f3200d17e092
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
golang.org/x/sys v0.0.0-20190529164535-6a60838ec259 // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/api v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20190522204451-c2c4e71fbf69 // indirect
google.golang.org/grpc v1.21.0 // indirect
pack.ag/amqp v0.11.0
)

162
go.sum
Просмотреть файл

@ -1,75 +1,153 @@
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/Azure/azure-amqp-common-go v1.1.4 h1:DmPXxmLZwi/71CgRTZIKR6yiKEW3eC42S4gSBhfG7y0=
github.com/Azure/azure-amqp-common-go v1.1.4/go.mod h1:FhZtXirFANw40UXI2ntweO+VOkfaw8s6vZxUiRhLYW8=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
contrib.go.opencensus.io/exporter/ocagent v0.5.0 h1:TKXjQSRS0/cCDrP7KvkgU6SmILtF/yV2TOs/02K/WZQ=
contrib.go.opencensus.io/exporter/ocagent v0.5.0/go.mod h1:ImxhfLRpxoYiSq891pBrLVhN+qmP8BTVvdH2YLs7Gl0=
github.com/Azure/azure-amqp-common-go v1.1.5-0.20190529222532-b92ea57d4550 h1:gYyXuDqyVPLoPRGEc+0euG7IKrEskcXxD8sn8ZF4rdQ=
github.com/Azure/azure-amqp-common-go v1.1.5-0.20190529222532-b92ea57d4550/go.mod h1:1ZplNYSTFCYf0E6VZjZBTUEbj10mr75PaWv3Ve5sLBg=
github.com/Azure/azure-pipeline-go v0.1.8 h1:KmVRa8oFMaargVesEuuEoiLCQ4zCCwQ8QX/xg++KS20=
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-sdk-for-go v21.3.0+incompatible h1:YFvAka2WKAl2xnJkYV1e1b7E2z88AgFszDzWU18ejMY=
github.com/Azure/azure-sdk-for-go v21.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.0.0-20181023070848-cf01652132cc h1:BElWmFfsryQD72OcovStKpkIcd4e9ozSkdsTNQDSHGk=
github.com/Azure/azure-storage-blob-go v0.0.0-20181023070848-cf01652132cc/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/go-autorest v11.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest v11.1.1+incompatible h1:kqw9PTHZBZKk6kSv/S7L/qxKKcz6hBDnmjWJU5RnHTw=
github.com/Azure/go-autorest v11.1.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg=
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-sdk-for-go v29.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v30.0.0+incompatible h1:6o1Yzl7wTBYg+xw0pY4qnalaPmEQolubEEdepo1/kmI=
github.com/Azure/azure-sdk-for-go v30.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.6.0 h1:SEATKb3LIHcaSIX+E6/K4kJpwfuozFEsmt5rS56N6CE=
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/go-autorest v12.0.0+incompatible h1:N+VqClcomLGD/sHb3smbSYYtNMgKpVV3Cd5r5i8z6bQ=
github.com/Azure/go-autorest v12.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/devigned/tab v0.0.1/go.mod h1:oVYrfgGyond090gxCvvbjZji79+peOiSV6vhZhKJM0Y=
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/devigned/tab/opencensus v0.1.1 h1:KX8QdRsPPEb6Wg+hWxk+gVSMvCvpXBpN3Hp3fOU1Efo=
github.com/devigned/tab/opencensus v0.1.1/go.mod h1:U6xXMXnNwXJpdaK0mnT3zdng4WTi+vCfqn7YHofEv2A=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dimchansky/utfbom v1.0.0 h1:fGC2kkf4qOoKqZ4q7iIh+Vef4ubC1c38UDsEyZynZPc=
github.com/dimchansky/utfbom v1.0.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4=
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/fortytw2/leaktest v1.2.0 h1:cj6GCiwJDH7l3tMHLjZDo0QqPtrXJiWSI9JgpeQKw+Q=
github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE=
github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe h1:CHRGQ8V7OlCYtwaKPJi3iA7J+YdNKdo8j7nG5IgDhjs=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sirupsen/logrus v1.1.1 h1:VzGj7lhU7KEB9e9gMpAV/v5XT2NVSvLJhJLCWbnkgXg=
github.com/sirupsen/logrus v1.1.1/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.opencensus.io v0.18.0 h1:Mk5rgZcggtbvtAun5aJzAtjKKN/t0R3jJPlWILlv938=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 h1:Vk3wNqEZwyGyei9yq5ekj7frek2u7HUfffJ1/opblzc=
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPlt9tHXFfw5kvc0yqlxRPWo=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190529164535-6a60838ec259 h1:so6Hr/LodwSZ5UQDu/7PmQiDeS112WwtLvU3lpSPZTU=
golang.org/x/sys v0.0.0-20190529164535-6a60838ec259/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
google.golang.org/api v0.4.0 h1:KKgc1aqhV8wDPbDzlDtpvyjZFY3vjz85FP7p4wcQUyI=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.5.0 h1:lj9SyhMzyoa38fgFF0oO2T6pjs5IzkLPKfVtxpyCRMM=
google.golang.org/api v0.5.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19 h1:Lj2SnHtxkRGJDqnGaSjo+CCdIieEnwVazbOXILwQemk=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190522204451-c2c4e71fbf69 h1:4rNOqY4ULrKzS6twXa619uQgI7h9PaVd4ZhjFQ7C5zs=
google.golang.org/genproto v0.0.0-20190522204451-c2c4e71fbf69/go.mod h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.0 h1:G+97AoqBnmZIT91cLG/EkCoK9NSelj64P8bOHHNmGn0=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
pack.ag/amqp v0.11.0 h1:ot/IA0enDkt4/c8xfbCO7AZzjM4bHys/UffnFmnHUnU=
pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=

Просмотреть файл

@ -34,7 +34,7 @@ import (
"time"
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/devigned/tab"
)
const (
@ -117,7 +117,7 @@ func (em *entityManager) Execute(ctx context.Context, method string, entityPath
}
req, err := http.NewRequest(method, em.Host+strings.TrimPrefix(entityPath, "/"), body)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -126,7 +126,7 @@ func (em *entityManager) Execute(ctx context.Context, method string, entityPath
applyRequestInfo(span, req)
req, err = em.addAuthorization(req)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -134,7 +134,7 @@ func (em *entityManager) Execute(ctx context.Context, method string, entityPath
res, err := client.Do(req)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
if res != nil {

40
hub.go
Просмотреть файл

@ -36,16 +36,16 @@ import (
"github.com/Azure/azure-amqp-common-go/aad"
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-amqp-common-go/conn"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/persist"
"github.com/Azure/azure-amqp-common-go/sas"
"github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/date"
"github.com/Azure/go-autorest/autorest/to"
"github.com/devigned/tab"
"pack.ag/amqp"
"github.com/Azure/azure-event-hubs-go/atom"
"github.com/Azure/azure-event-hubs-go/persist"
)
const (
@ -216,7 +216,7 @@ func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagemen
reqBytes, err := xml.Marshal(he)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -227,13 +227,13 @@ func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagemen
}
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -256,13 +256,13 @@ func (hm *HubManager) List(ctx context.Context) ([]*HubEntity, error) {
}
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -290,7 +290,7 @@ func (hm *HubManager) Get(ctx context.Context, name string) (*HubEntity, error)
}
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -300,7 +300,7 @@ func (hm *HubManager) Get(ctx context.Context, name string) (*HubEntity, error)
b, err := ioutil.ReadAll(res.Body)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -489,19 +489,19 @@ func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation
client := newClient(h.namespace, h.name)
c, err := h.namespace.newConnection()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
defer func() {
if err := c.Close(); err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
}()
info, err := client.GetHubRuntimeInformation(ctx, c)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
@ -515,13 +515,13 @@ func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (
client := newClient(h.namespace, h.name)
c, err := h.namespace.newConnection()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
defer func() {
if err := c.Close(); err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
}()
@ -542,12 +542,12 @@ func (h *Hub) Close(ctx context.Context) error {
if err := h.sender.Close(ctx); err != nil {
if rErr := h.closeReceivers(ctx); rErr != nil {
if !isConnectionClosed(rErr) {
log.For(ctx).Error(rErr)
tab.For(ctx).Error(rErr)
}
}
if !isConnectionClosed(err) {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
@ -558,7 +558,7 @@ func (h *Hub) Close(ctx context.Context) error {
// close receivers and return error
err := h.closeReceivers(ctx)
if err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
@ -573,7 +573,7 @@ func (h *Hub) closeReceivers(ctx context.Context) error {
var lastErr error
for _, r := range h.receivers {
if err := r.Close(ctx); err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
lastErr = err
}
}
@ -610,7 +610,7 @@ func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler,
// Todo: change this to use name rather than identifier
if r, ok := h.receivers[receiver.getIdentifier()]; ok {
if err := r.Close(ctx); err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
}
@ -720,7 +720,7 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
if h.sender == nil {
s, err := h.newSender(ctx)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, err
}
h.sender = s

Просмотреть файл

@ -27,6 +27,10 @@ import (
"runtime"
"strings"
// use OpenCensus tracing by default; to use opentracing add the following import to your application
// `_ "github.com/devigned/tab/opentracing"`
_ "github.com/devigned/tab/opencensus"
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-amqp-common-go/cbs"
"github.com/Azure/azure-amqp-common-go/conn"

69
persist/checkpoint.go Normal file
Просмотреть файл

@ -0,0 +1,69 @@
package persist
// MIT License
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE
import (
"time"
)
const (
// StartOfStream is a constant defined to represent the start of a partition stream in EventHub.
StartOfStream = "-1"
// EndOfStream is a constant defined to represent the current end of a partition stream in EventHub.
// This can be used as an offset argument in receiver creation to start receiving from the latest
// event, instead of a specific offset or point in time.
EndOfStream = "@latest"
)
type (
// Checkpoint is the information needed to determine the last message processed
Checkpoint struct {
Offset string `json:"offset"`
SequenceNumber int64 `json:"sequenceNumber"`
EnqueueTime time.Time `json:"enqueueTime"`
}
)
// NewCheckpointFromStartOfStream returns a checkpoint for the start of the stream
func NewCheckpointFromStartOfStream() Checkpoint {
return Checkpoint{
Offset: StartOfStream,
}
}
// NewCheckpointFromEndOfStream returns a checkpoint for the end of the stream
func NewCheckpointFromEndOfStream() Checkpoint {
return Checkpoint{
Offset: EndOfStream,
}
}
// NewCheckpoint contains the information needed to checkpoint Event Hub progress
func NewCheckpoint(offset string, sequence int64, enqueueTime time.Time) Checkpoint {
return Checkpoint{
Offset: offset,
SequenceNumber: sequence,
EnqueueTime: enqueueTime,
}
}

100
persist/file.go Normal file
Просмотреть файл

@ -0,0 +1,100 @@
package persist
// MIT License
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE
import (
"bytes"
"encoding/json"
"io"
"os"
"path"
"strings"
"sync"
)
type (
// FilePersister implements CheckpointPersister for saving to the file system
FilePersister struct {
directory string
mu sync.Mutex
}
)
// NewFilePersister creates a FilePersister for saving to a given directory
func NewFilePersister(directory string) (*FilePersister, error) {
err := os.MkdirAll(directory, 0777)
return &FilePersister{
directory: directory,
}, err
}
func (fp *FilePersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error {
fp.mu.Lock()
defer fp.mu.Unlock()
key := getFilePath(namespace, name, consumerGroup, partitionID)
filePath := path.Join(fp.directory, key)
bits, err := json.Marshal(checkpoint)
if err != nil {
return err
}
file, err := os.Create(filePath)
if err != nil {
return err
}
_, err = file.Write(bits)
if err != nil {
return err
}
return file.Close()
}
func (fp *FilePersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) {
fp.mu.Lock()
defer fp.mu.Unlock()
key := getFilePath(namespace, name, consumerGroup, partitionID)
filePath := path.Join(fp.directory, key)
f, err := os.Open(filePath)
if err != nil {
return NewCheckpointFromStartOfStream(), nil
}
buf := bytes.NewBuffer(nil)
_, err = io.Copy(buf, f)
if err != nil {
return NewCheckpointFromStartOfStream(), err
}
var checkpoint Checkpoint
err = json.Unmarshal(buf.Bytes(), &checkpoint)
return checkpoint, err
}
func getFilePath(namespace, name, consumerGroup, partitionID string) string {
key := strings.Join([]string{namespace, name, consumerGroup, partitionID}, "_")
return strings.Replace(key, "$", "", -1)
}

80
persist/file_test.go Normal file
Просмотреть файл

@ -0,0 +1,80 @@
package persist
// MIT License
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE
import (
"math/rand"
"os"
"path"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
var (
letterRunes = []rune("abcdefghijklmnopqrstuvwxyz123456789")
)
func init() {
rand.Seed(time.Now().Unix())
}
func TestFilePersister_Read(t *testing.T) {
namespace := "namespace"
name := "name"
group := "$Default"
partitionID := "0"
dir := path.Join(os.TempDir(), RandomName("read", 4))
persister, err := NewFilePersister(dir)
assert.Nil(t, err)
ckp, err := persister.Read(namespace, name, group, partitionID)
assert.Nil(t, err)
assert.Equal(t, NewCheckpointFromStartOfStream(), ckp)
}
func TestFilePersister_Write(t *testing.T) {
namespace := "namespace"
name := "name"
group := "$Default"
partitionID := "0"
dir := path.Join(os.TempDir(), RandomName("write", 4))
persister, err := NewFilePersister(dir)
assert.Nil(t, err)
ckp := NewCheckpoint("120", 22, time.Now())
err = persister.Write(namespace, name, group, partitionID, ckp)
assert.Nil(t, err)
ckp2, err := persister.Read(namespace, name, group, partitionID)
assert.Nil(t, err)
assert.Equal(t, ckp.Offset, ckp2.Offset)
assert.Equal(t, ckp.SequenceNumber, ckp2.SequenceNumber)
}
// RandomName generates a random Event Hub name
func RandomName(prefix string, length int) string {
b := make([]rune, length)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return prefix + "-" + string(b)
}

81
persist/persist.go Normal file
Просмотреть файл

@ -0,0 +1,81 @@
// Package persist provides abstract structures for checkpoint persistence.
package persist
// MIT License
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE
import (
"fmt"
"path"
"sync"
)
type (
// CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and
// offset so that if a receiver where to be interrupted, it could resume after the last consumed event.
CheckpointPersister interface {
Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error
Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
}
// MemoryPersister is a default implementation of a Hub CheckpointPersister, which will persist offset information in
// memory.
MemoryPersister struct {
values map[string]Checkpoint
mu sync.Mutex
}
)
// NewMemoryPersister creates a new in-memory storage for checkpoints
//
// MemoryPersister is only intended to be shared with EventProcessorHosts within the same process. This implementation
// is a toy. You should probably use the Azure Storage implementation or any other that provides durable storage for
// checkpoints.
func NewMemoryPersister() *MemoryPersister {
return &MemoryPersister{
values: make(map[string]Checkpoint),
}
}
func (p *MemoryPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error {
p.mu.Lock()
defer p.mu.Unlock()
key := getPersistenceKey(namespace, name, consumerGroup, partitionID)
p.values[key] = checkpoint
return nil
}
func (p *MemoryPersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) {
p.mu.Lock()
defer p.mu.Unlock()
key := getPersistenceKey(namespace, name, consumerGroup, partitionID)
if offset, ok := p.values[key]; ok {
return offset, nil
}
return NewCheckpointFromStartOfStream(), fmt.Errorf("could not read the offset for the key %s", key)
}
func getPersistenceKey(namespace, name, consumerGroup, partitionID string) string {
return path.Join(namespace, name, consumerGroup, partitionID)
}

Просмотреть файл

@ -28,11 +28,10 @@ import (
"time"
"github.com/Azure/azure-amqp-common-go"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/persist"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
"github.com/devigned/tab"
"pack.ag/amqp"
"github.com/Azure/azure-event-hubs-go/persist"
)
const (
@ -146,7 +145,7 @@ func (h *Hub) newReceiver(ctx context.Context, partitionID string, opts ...Recei
}
}
log.For(ctx).Debug("creating a new receiver")
tab.For(ctx).Debug("creating a new receiver")
err := receiver.newSessionAndLink(ctx)
return receiver, err
}
@ -162,23 +161,23 @@ func (r *receiver) Close(ctx context.Context) error {
err := r.receiver.Close(ctx)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
if sessionErr := r.session.Close(ctx); sessionErr != nil {
log.For(ctx).Error(sessionErr)
tab.For(ctx).Error(sessionErr)
}
if connErr := r.connection.Close(); connErr != nil {
log.For(ctx).Error(connErr)
tab.For(ctx).Error(connErr)
}
return err
}
if sessionErr := r.session.Close(ctx); sessionErr != nil {
log.For(ctx).Error(sessionErr)
tab.For(ctx).Error(sessionErr)
if connErr := r.connection.Close(); connErr != nil {
log.For(ctx).Error(connErr)
tab.For(ctx).Error(connErr)
}
return sessionErr
@ -228,46 +227,40 @@ func (r *receiver) handleMessages(ctx context.Context, messages chan *amqp.Messa
}
func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) {
const optName = "eh.Receiver.handleMessage"
event, err := eventFromMsg(msg)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
r.lastError = err
r.done()
}
var span *trace.Span
if val, ok := event.Get("_oc_prop"); ok {
if sc, ok := propagation.FromBinary(val.([]byte)); ok {
span, ctx = r.startConsumerSpanFromWire(ctx, "eh.receiver.handleMessage", sc)
}
}
if span == nil {
span, ctx = r.startConsumerSpanFromContext(ctx, "eh.receiver.handleMessage")
}
ctx, span := tab.StartSpanWithRemoteParent(ctx, optName, event)
defer span.End()
id := messageID(msg)
if str, ok := id.(string); ok {
span.AddAttributes(trace.StringAttribute("eh.message_id", str))
span.AddAttributes(tab.StringAttribute("eh.message_id", str))
}
err = handler(ctx, event)
if err != nil {
err = msg.Modify(true, false, nil)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
log.For(ctx).Error(fmt.Errorf("message modified(true, false, nil): id: %v", id))
tab.For(ctx).Error(fmt.Errorf("message modified(true, false, nil): id: %v", id))
return
}
err = msg.Accept()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
err = r.storeLastReceivedCheckpoint(event.GetCheckpoint())
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
}
@ -284,11 +277,11 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
select {
case <-ctx.Done():
log.For(ctx).Debug("context done")
tab.For(ctx).Debug("context done")
return
default:
if amqpErr, ok := err.(*amqp.DetachError); ok && amqpErr.RemoteError != nil && amqpErr.RemoteError.Condition == "amqp:link:stolen" {
log.For(ctx).Debug("link has been stolen by a higher epoch")
tab.For(ctx).Debug("link has been stolen by a higher epoch")
_ = r.Close(ctx)
return
}
@ -297,10 +290,10 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
sp, ctx := r.startConsumerSpanFromContext(ctx, "eh.receiver.listenForMessages.tryRecover")
defer sp.End()
log.For(ctx).Debug("recovering connection")
tab.For(ctx).Debug("recovering connection")
err := r.Recover(ctx)
if err == nil {
log.For(ctx).Debug("recovered connection")
tab.For(ctx).Debug("recovered connection")
return nil, nil
}
@ -313,7 +306,7 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
})
if retryErr != nil {
log.For(ctx).Debug("retried, but error was unrecoverable")
tab.For(ctx).Debug("retried, but error was unrecoverable")
r.lastError = retryErr
_ = r.Close(ctx)
return
@ -328,13 +321,13 @@ func (r *receiver) listenForMessage(ctx context.Context) (*amqp.Message, error)
msg, err := r.receiver.Receive(ctx)
if err != nil {
log.For(ctx).Debug(err.Error())
tab.For(ctx).Debug(err.Error())
return nil, err
}
id := messageID(msg)
if str, ok := id.(string); ok {
span.AddAttributes(trace.StringAttribute("he.message_id", str))
span.AddAttributes(tab.StringAttribute("he.message_id", str))
}
return msg, nil
}
@ -353,25 +346,25 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
address := r.getAddress()
err = r.hub.namespace.negotiateClaim(ctx, connection, address)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
amqpSession, err := connection.NewSession()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
offsetExpression, err := r.getOffsetExpression()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
r.session, err = newSession(amqpSession)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
@ -388,7 +381,7 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
amqpReceiver, err := amqpSession.NewReceiver(opts...)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}

Просмотреть файл

@ -28,11 +28,9 @@ import (
"net"
"time"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/devigned/tab"
"github.com/jpillora/backoff"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
"pack.ag/amqp"
)
@ -52,7 +50,7 @@ type (
SendOption func(event *Event) error
eventer interface {
Set(key string, value interface{})
tab.Carrier
toMsg() (*amqp.Message, error)
}
)
@ -71,7 +69,7 @@ func (h *Hub) newSender(ctx context.Context) (*sender, error) {
Jitter: true,
},
}
log.For(ctx).Debug(fmt.Sprintf("creating a new sender for entity path %s", s.getAddress()))
tab.For(ctx).Debug(fmt.Sprintf("creating a new sender for entity path %s", s.getAddress()))
err := s.newSessionAndLink(ctx)
return s, err
}
@ -97,23 +95,23 @@ func (s *sender) Close(ctx context.Context) error {
err := s.sender.Close(ctx)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
if sessionErr := s.session.Close(ctx); sessionErr != nil {
log.For(ctx).Error(sessionErr)
tab.For(ctx).Error(sessionErr)
}
if connErr := s.connection.Close(); connErr != nil {
log.For(ctx).Error(connErr)
tab.For(ctx).Error(connErr)
}
return err
}
if sessionErr := s.session.Close(ctx); sessionErr != nil {
log.For(ctx).Error(sessionErr)
tab.For(ctx).Error(sessionErr)
if connErr := s.connection.Close(); connErr != nil {
log.For(ctx).Error(connErr)
tab.For(ctx).Error(connErr)
}
return sessionErr
@ -151,25 +149,30 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
sp, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.trySend")
defer sp.End()
evt.Set("_oc_prop", propagation.Binary(sp.SpanContext()))
if err := sp.Inject(evt); err != nil {
tab.For(ctx).Error(err)
return err
}
msg, err := evt.toMsg()
if err != nil {
tab.For(ctx).Error(err)
return err
}
if str, ok := msg.Properties.MessageID.(string); ok {
sp.AddAttributes(trace.StringAttribute("he.message_id", str))
sp.AddAttributes(tab.StringAttribute("he.message_id", str))
}
recvr := func(err error) {
duration := s.recoveryBackoff.Duration()
log.For(ctx).Debug("amqp error, delaying " + string(duration/time.Millisecond) + " millis: " + err.Error())
tab.For(ctx).Debug("amqp error, delaying " + string(duration/time.Millisecond) + " millis: " + err.Error())
time.Sleep(duration)
err = s.Recover(ctx)
if err != nil {
log.For(ctx).Debug("failed to recover connection")
tab.For(ctx).Debug("failed to recover connection")
} else {
log.For(ctx).Debug("recovered connection")
tab.For(ctx).Debug("recovered connection")
s.recoveryBackoff.Reset()
}
}
@ -227,20 +230,20 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
connection, err := s.hub.namespace.newConnection()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
s.connection = connection
err = s.hub.namespace.negotiateClaim(ctx, connection, s.getAddress())
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
amqpSession, err := connection.NewSession()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
@ -249,13 +252,13 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
amqp.LinkTargetAddress(s.getAddress()),
)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}
s.session, err = newSession(amqpSession)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return err
}

Просмотреть файл

@ -34,12 +34,12 @@ import (
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/persist"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/devigned/tab"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/eph"
"go.opencensus.io/trace"
"github.com/Azure/azure-event-hubs-go/persist"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"
@ -250,19 +250,19 @@ func (sl *LeaserCheckpointer) AcquireLease(ctx context.Context, partitionID stri
blobURL := sl.containerURL.NewBlobURL(partitionID)
lease, err := sl.getLease(ctx, partitionID)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, nil
}
res, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, err
}
uuidToken, err := uuid.NewV4()
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, err
}
@ -271,13 +271,13 @@ func (sl *LeaserCheckpointer) AcquireLease(ctx context.Context, partitionID stri
// is leased by someone else due to a race to acquire
_, err := blobURL.ChangeLease(ctx, lease.Token, newToken, azblob.ModifiedAccessConditions{})
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, err
}
} else {
_, err = blobURL.AcquireLease(ctx, newToken, int32(sl.leaseDuration.Round(time.Second).Seconds()), azblob.ModifiedAccessConditions{})
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, err
}
}
@ -309,7 +309,7 @@ func (sl *LeaserCheckpointer) RenewLease(ctx context.Context, partitionID string
_, err := blobURL.RenewLease(ctx, lease.Token, azblob.ModifiedAccessConditions{})
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, err
}
return lease, true, nil
@ -331,7 +331,7 @@ func (sl *LeaserCheckpointer) ReleaseLease(ctx context.Context, partitionID stri
_, err := blobURL.ReleaseLease(ctx, lease.Token, azblob.ModifiedAccessConditions{})
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return false, err
}
delete(sl.leases, partitionID)
@ -362,7 +362,7 @@ func (sl *LeaserCheckpointer) updateLease(ctx context.Context, partitionID strin
_, err := blobURL.RenewLease(ctx, lease.Token, azblob.ModifiedAccessConditions{})
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, err
}
@ -372,7 +372,7 @@ func (sl *LeaserCheckpointer) updateLease(ctx context.Context, partitionID strin
err = sl.uploadLease(ctx, lease)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
return nil, false, err
}
@ -483,7 +483,7 @@ func (sl *LeaserCheckpointer) persistLeases(ctx context.Context) {
default:
err := sl.persistDirtyPartitions(ctx)
if err != nil {
log.For(ctx).Error(err)
tab.For(ctx).Error(err)
}
<-time.After(sl.LeasePersistenceInterval)
}
@ -652,12 +652,12 @@ func (s *storageLease) String() string {
return string(bits)
}
func startConsumerSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
ctx, span := trace.StartSpan(ctx, operationName, opts...)
func startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
ctx, span := tab.StartSpan(ctx, operationName)
eventhub.ApplyComponentInfo(span)
span.AddAttributes(
trace.StringAttribute("span.kind", "client"),
trace.StringAttribute("eh.eventprocessorhost.kind", "azure.storage"),
tab.StringAttribute("span.kind", "client"),
tab.StringAttribute("eh.eventprocessorhost.kind", "azure.storage"),
)
return span, ctx
}

Просмотреть файл

@ -6,82 +6,72 @@ import (
"os"
"strconv"
"go.opencensus.io/trace"
"github.com/devigned/tab"
)
func (h *Hub) startSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
ctx, span := trace.StartSpan(ctx, operationName, opts...)
func (h *Hub) startSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
ctx, span := tab.StartSpan(ctx, operationName)
ApplyComponentInfo(span)
return span, ctx
}
func (ns *namespace) startSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
ctx, span := trace.StartSpan(ctx, operationName, opts...)
func (ns *namespace) startSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
ctx, span := tab.StartSpan(ctx, operationName)
ApplyComponentInfo(span)
return span, ctx
}
func (s *sender) startProducerSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
ctx, span := trace.StartSpan(ctx, operationName, opts...)
func (s *sender) startProducerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
ctx, span := tab.StartSpan(ctx, operationName)
ApplyComponentInfo(span)
span.AddAttributes(
trace.StringAttribute("span.kind", "producer"),
trace.StringAttribute("message_bus.destination", s.getFullIdentifier()),
tab.StringAttribute("span.kind", "producer"),
tab.StringAttribute("message_bus.destination", s.getFullIdentifier()),
)
return span, ctx
}
func (r *receiver) startConsumerSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
ctx, span := trace.StartSpan(ctx, operationName, opts...)
func (r *receiver) startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
ctx, span := tab.StartSpan(ctx, operationName)
ApplyComponentInfo(span)
span.AddAttributes(
trace.StringAttribute("span.kind", "consumer"),
trace.StringAttribute("message_bus.destination", r.getFullIdentifier()),
tab.StringAttribute("span.kind", "consumer"),
tab.StringAttribute("message_bus.destination", r.getFullIdentifier()),
)
return span, ctx
}
func (r *receiver) startConsumerSpanFromWire(ctx context.Context, operationName string, reference trace.SpanContext, opts ...trace.StartOption) (*trace.Span, context.Context) {
ctx, span := trace.StartSpanWithRemoteParent(ctx, operationName, reference, opts...)
func (em *entityManager) startSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
ctx, span := tab.StartSpan(ctx, operationName)
ApplyComponentInfo(span)
span.AddAttributes(
trace.StringAttribute("span.kind", "consumer"),
trace.StringAttribute("message_bus.destination", r.getFullIdentifier()),
)
return span, ctx
}
func (em *entityManager) startSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
ctx, span := trace.StartSpan(ctx, operationName, opts...)
ApplyComponentInfo(span)
span.AddAttributes(trace.StringAttribute("span.kind", "client"))
span.AddAttributes(tab.StringAttribute("span.kind", "client"))
return span, ctx
}
// ApplyComponentInfo applies eventhub library and network info to the span
func ApplyComponentInfo(span *trace.Span) {
func ApplyComponentInfo(span tab.Spanner) {
span.AddAttributes(
trace.StringAttribute("component", "github.com/Azure/azure-event-hubs-go"),
trace.StringAttribute("version", Version))
tab.StringAttribute("component", "github.com/Azure/azure-event-hubs-go"),
tab.StringAttribute("version", Version))
applyNetworkInfo(span)
}
func applyNetworkInfo(span *trace.Span) {
func applyNetworkInfo(span tab.Spanner) {
hostname, err := os.Hostname()
if err == nil {
span.AddAttributes(trace.StringAttribute("peer.hostname", hostname))
span.AddAttributes(tab.StringAttribute("peer.hostname", hostname))
}
}
func applyRequestInfo(span *trace.Span, req *http.Request) {
func applyRequestInfo(span tab.Spanner, req *http.Request) {
span.AddAttributes(
trace.StringAttribute("http.url", req.URL.String()),
trace.StringAttribute("http.method", req.Method),
tab.StringAttribute("http.url", req.URL.String()),
tab.StringAttribute("http.method", req.Method),
)
}
func applyResponseInfo(span *trace.Span, res *http.Response) {
func applyResponseInfo(span tab.Spanner, res *http.Response) {
if res != nil {
span.AddAttributes(trace.StringAttribute("http.status_code", strconv.Itoa(res.StatusCode)))
span.AddAttributes(tab.StringAttribute("http.status_code", strconv.Itoa(res.StatusCode)))
}
}

Просмотреть файл

@ -2,5 +2,5 @@ package eventhub
const (
// Version is the semantic version number
Version = "1.3.1"
Version = "2.0.0"
)