Merge pull request #36 from Azure/tracing
**Breaking Change** remove persist and log packages
This commit is contained in:
Коммит
d021cd0bfe
18
.travis.yml
18
.travis.yml
|
@ -2,19 +2,19 @@ language: go
|
|||
sudo: false
|
||||
go:
|
||||
- 1.x
|
||||
- tip
|
||||
|
||||
env:
|
||||
- GO111MODULE=on
|
||||
- 1.11.x
|
||||
|
||||
matrix:
|
||||
allow_failures:
|
||||
- go: tip
|
||||
fast_finish: true
|
||||
|
||||
before_install:
|
||||
- go get github.com/fzipp/gocyclo
|
||||
- go get honnef.co/go/tools/cmd/megacheck
|
||||
- cd ${TRAVIS_HOME}
|
||||
- go get github.com/mattn/goveralls
|
||||
- go get golang.org/x/tools/cmd/cover
|
||||
- go get github.com/fzipp/gocyclo
|
||||
- go get golang.org/x/lint/golint
|
||||
- cd ${TRAVIS_BUILD_DIR}
|
||||
|
||||
script:
|
||||
make test
|
||||
- export GO111MODULE=on
|
||||
- make test
|
12
Makefile
12
Makefile
|
@ -19,7 +19,7 @@ M = $(shell printf "\033[34;1m▶\033[0m")
|
|||
TIMEOUT = 360
|
||||
|
||||
.PHONY: all
|
||||
all: fmt go.sum lint vet megacheck | $(BASE) ; $(info $(M) building library…) @ ## Build program
|
||||
all: fmt go.sum lint vet tidy | $(BASE) ; $(info $(M) building library…) @ ## Build program
|
||||
$Q cd $(BASE) && $(GO) build \
|
||||
-tags release \
|
||||
-ldflags '-X $(PACKAGE)/cmd.Version=$(VERSION) -X $(PACKAGE)/cmd.BuildDate=$(DATE)' \
|
||||
|
@ -35,6 +35,10 @@ GOLINT = $(BIN)/golint
|
|||
$(BIN)/golint: | $(BASE) ; $(info $(M) building golint…)
|
||||
$Q go get -u golang.org/x/lint/golint
|
||||
|
||||
.PHONY: tidy
|
||||
tidy: ; $(info $(M) running tidy…) @ ## Run tidy
|
||||
$Q $(GO) mod tidy
|
||||
|
||||
# Tests
|
||||
|
||||
TEST_TARGETS := test-default test-bench test-short test-verbose test-race test-debug
|
||||
|
@ -47,7 +51,7 @@ test-race: ARGS=-race ## Run tests with race detector
|
|||
test-cover: ARGS=-cover ## Run tests in verbose mode with coverage
|
||||
$(TEST_TARGETS): NAME=$(MAKECMDGOALS:test-%=%)
|
||||
$(TEST_TARGETS): test
|
||||
check test tests: cyclo lint vet go.sum megacheck | $(BASE) ; $(info $(M) running $(NAME:%=% )tests…) @ ## Run tests
|
||||
check test tests: cyclo lint vet go.sum | $(BASE) ; $(info $(M) running $(NAME:%=% )tests…) @ ## Run tests
|
||||
$Q cd $(BASE) && $(GO) test -timeout $(TIMEOUT)s $(ARGS) $(TESTPKGS)
|
||||
|
||||
.PHONY: vet
|
||||
|
@ -60,10 +64,6 @@ lint: go.sum | $(BASE) $(GOLINT) ; $(info $(M) running golint…) @ ## Run golin
|
|||
test -z "$$($(GOLINT) $$pkg | tee /dev/stderr)" || ret=1 ; \
|
||||
done ; exit $$ret
|
||||
|
||||
.PHONY: megacheck
|
||||
megacheck: go.sum | $(BASE) ; $(info $(M) running megacheck…) @ ## Run megacheck
|
||||
$Q cd $(BASE) && megacheck
|
||||
|
||||
.PHONY: fmt
|
||||
fmt: ; $(info $(M) running gofmt…) @ ## Run gofmt on all source files
|
||||
@ret=0 && for d in $$($(GO) list -f '{{.Dir}}' ./...); do \
|
||||
|
|
18
README.md
18
README.md
|
@ -4,15 +4,23 @@
|
|||
[![Build Status](https://travis-ci.org/Azure/azure-amqp-common-go.svg?branch=master)](https://travis-ci.org/Azure/azure-amqp-common-go)
|
||||
|
||||
This project contains reusable components for AMQP based services like Event Hub and Service Bus. You will find
|
||||
abstractions over authentication, claims-based security, connection string parsing, checkpointing and RPC for AMQP.
|
||||
abstractions over authentication, claims-based security, connection string parsing and RPC for AMQP.
|
||||
|
||||
If you are looking for the Azure Event Hub library for go, you can find it [here](https://github.com/Azure/azure-event-hubs-go).
|
||||
If you are looking for the Azure Event Hub library for go, you can find it [here](https://aka.ms/azure-event-hubs-go).
|
||||
|
||||
## Install
|
||||
If you are looking for the Azure Service Bus library for go, you can find it [here](https://aka.ms/azure-service-bus-go).
|
||||
|
||||
### via go get
|
||||
## Install with Go modules
|
||||
If you want to use stable versions of the library, please use Go modules.
|
||||
|
||||
### Using go get targeting version 2.x.x
|
||||
``` bash
|
||||
go get github.com/Azure/azure-amqp-common-go
|
||||
go get -u github.com/Azure/azure-amqp-common-go/v2
|
||||
```
|
||||
|
||||
### Using go get targeting version 1.x.x
|
||||
``` bash
|
||||
go get -u github.com/Azure/azure-amqp-common-go
|
||||
```
|
||||
|
||||
## Contributing
|
||||
|
|
|
@ -33,10 +33,11 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/auth"
|
||||
"github.com/Azure/go-autorest/autorest/adal"
|
||||
"github.com/Azure/go-autorest/autorest/azure"
|
||||
"golang.org/x/crypto/pkcs12"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/v2/auth"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
17
cbs/cbs.go
17
cbs/cbs.go
|
@ -29,11 +29,12 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/auth"
|
||||
"github.com/Azure/azure-amqp-common-go/internal/tracing"
|
||||
"github.com/Azure/azure-amqp-common-go/log"
|
||||
"github.com/Azure/azure-amqp-common-go/rpc"
|
||||
"github.com/devigned/tab"
|
||||
"pack.ag/amqp"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/v2/auth"
|
||||
"github.com/Azure/azure-amqp-common-go/v2/internal/tracing"
|
||||
"github.com/Azure/azure-amqp-common-go/v2/rpc"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -47,7 +48,7 @@ const (
|
|||
|
||||
// NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
|
||||
func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, provider auth.TokenProvider) error {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.cbs.NegotiateClaim")
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.cbs.NegotiateClaim")
|
||||
defer span.End()
|
||||
|
||||
link, err := rpc.NewLink(conn, cbsAddress)
|
||||
|
@ -61,7 +62,7 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro
|
|||
return err
|
||||
}
|
||||
|
||||
log.For(ctx).Debug(fmt.Sprintf("negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry))
|
||||
tab.For(ctx).Debug(fmt.Sprintf("negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry))
|
||||
msg := &amqp.Message{
|
||||
Value: token.Token,
|
||||
ApplicationProperties: map[string]interface{}{
|
||||
|
@ -74,10 +75,10 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro
|
|||
|
||||
res, err := link.RetryableRPC(ctx, 3, 1*time.Second, msg)
|
||||
if err != nil {
|
||||
log.For(ctx).Error(err)
|
||||
tab.For(ctx).Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.For(ctx).Debug(fmt.Sprintf("negotiated with response code %d and message: %s", res.Code, res.Description))
|
||||
tab.For(ctx).Debug(fmt.Sprintf("negotiated with response code %d and message: %s", res.Code, res.Description))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
# Change Log
|
||||
|
||||
## `v2.0.0`
|
||||
- [**breaking change** remove persist and move into the Event Hubs package](https://github.com/Azure/azure-event-hubs-go/pull/112)
|
||||
- **breaking change** remove log package in favor of https://github.com/devigned/tab
|
||||
|
||||
## `v1.1.4`
|
||||
- allow status description on RPC calls to be empty without returning an error https://github.com/Azure/azure-event-hubs-go/issues/88
|
||||
|
||||
|
|
17
go.mod
17
go.mod
|
@ -1,17 +1,18 @@
|
|||
module github.com/Azure/azure-amqp-common-go
|
||||
module github.com/Azure/azure-amqp-common-go/v2
|
||||
|
||||
go 1.12
|
||||
|
||||
require (
|
||||
github.com/Azure/azure-sdk-for-go v21.3.0+incompatible // indirect
|
||||
github.com/Azure/go-autorest v11.0.0+incompatible
|
||||
contrib.go.opencensus.io/exporter/ocagent v0.5.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go v29.0.0+incompatible // indirect
|
||||
github.com/Azure/go-autorest v12.0.0+incompatible
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/devigned/tab v0.1.1
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
|
||||
github.com/fortytw2/leaktest v1.2.0 // indirect
|
||||
github.com/google/go-cmp v0.2.0 // indirect
|
||||
github.com/pkg/errors v0.8.0 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/stretchr/testify v1.2.2
|
||||
go.opencensus.io v0.15.0
|
||||
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 // indirect
|
||||
pack.ag/amqp v0.8.0
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
|
||||
pack.ag/amqp v0.11.0
|
||||
)
|
||||
|
|
90
go.sum
90
go.sum
|
@ -1,26 +1,92 @@
|
|||
github.com/Azure/azure-sdk-for-go v21.3.0+incompatible h1:YFvAka2WKAl2xnJkYV1e1b7E2z88AgFszDzWU18ejMY=
|
||||
github.com/Azure/azure-sdk-for-go v21.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/go-autorest v11.0.0+incompatible h1:eiSNFBmiWLzcjYsTkq8XZ0KRlvirsX0eyFQ8ZyCTgiQ=
|
||||
github.com/Azure/go-autorest v11.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
contrib.go.opencensus.io/exporter/ocagent v0.5.0 h1:TKXjQSRS0/cCDrP7KvkgU6SmILtF/yV2TOs/02K/WZQ=
|
||||
contrib.go.opencensus.io/exporter/ocagent v0.5.0/go.mod h1:ImxhfLRpxoYiSq891pBrLVhN+qmP8BTVvdH2YLs7Gl0=
|
||||
github.com/Azure/azure-sdk-for-go v29.0.0+incompatible h1:CYPU39ULbGjQBo3gXIqiWouK0C4F+Pt2Zx5CqGvqknE=
|
||||
github.com/Azure/azure-sdk-for-go v29.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/go-autorest v12.0.0+incompatible h1:N+VqClcomLGD/sHb3smbSYYtNMgKpVV3Cd5r5i8z6bQ=
|
||||
github.com/Azure/go-autorest v12.0.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
|
||||
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/fortytw2/leaktest v1.2.0 h1:cj6GCiwJDH7l3tMHLjZDo0QqPtrXJiWSI9JgpeQKw+Q=
|
||||
github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
|
||||
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
|
||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
go.opencensus.io v0.15.0 h1:r1SzcjSm4ybA0qZs3B4QYX072f8gK61Kh0qtwyFpfdk=
|
||||
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
|
||||
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 h1:Vk3wNqEZwyGyei9yq5ekj7frek2u7HUfffJ1/opblzc=
|
||||
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
pack.ag/amqp v0.8.0 h1:JT0f88Hsbo5D+s8bBdleDOHvMDoYcaBW6GplAUqtxC4=
|
||||
pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI=
|
||||
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
google.golang.org/api v0.4.0 h1:KKgc1aqhV8wDPbDzlDtpvyjZFY3vjz85FP7p4wcQUyI=
|
||||
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19 h1:Lj2SnHtxkRGJDqnGaSjo+CCdIieEnwVazbOXILwQemk=
|
||||
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
|
||||
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
|
||||
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
pack.ag/amqp v0.11.0 h1:ot/IA0enDkt4/c8xfbCO7AZzjM4bHys/UffnFmnHUnU=
|
||||
pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
|
|
|
@ -4,28 +4,29 @@ import (
|
|||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/internal"
|
||||
"go.opencensus.io/trace"
|
||||
"github.com/devigned/tab"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/v2/internal"
|
||||
)
|
||||
|
||||
// StartSpanFromContext starts a span given a context and applies common library information
|
||||
func StartSpanFromContext(ctx context.Context, operationName string, opts ...trace.StartOption) (*trace.Span, context.Context) {
|
||||
ctx, span := trace.StartSpan(ctx, operationName, opts...)
|
||||
func StartSpanFromContext(ctx context.Context, operationName string) (context.Context, tab.Spanner) {
|
||||
ctx, span := tab.StartSpan(ctx, operationName)
|
||||
ApplyComponentInfo(span)
|
||||
return span, ctx
|
||||
return ctx, span
|
||||
}
|
||||
|
||||
// ApplyComponentInfo applies eventhub library and network info to the span
|
||||
func ApplyComponentInfo(span *trace.Span) {
|
||||
func ApplyComponentInfo(span tab.Spanner) {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("component", "github.com/Azure/azure-amqp-common-go"),
|
||||
trace.StringAttribute("version", common.Version))
|
||||
tab.StringAttribute("component", "github.com/Azure/azure-amqp-common-go"),
|
||||
tab.StringAttribute("version", common.Version))
|
||||
applyNetworkInfo(span)
|
||||
}
|
||||
|
||||
func applyNetworkInfo(span *trace.Span) {
|
||||
func applyNetworkInfo(span tab.Spanner) {
|
||||
hostname, err := os.Hostname()
|
||||
if err == nil {
|
||||
span.AddAttributes(trace.StringAttribute("peer.hostname", hostname))
|
||||
span.AddAttributes(tab.StringAttribute("peer.hostname", hostname))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,5 +2,5 @@ package common
|
|||
|
||||
const (
|
||||
// Version is the semantic version of the library
|
||||
Version = "1.1.4"
|
||||
Version = "2.0.0"
|
||||
)
|
||||
|
|
|
@ -1,61 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
type (
|
||||
// Logger is the interface for opentracing logging
|
||||
Logger interface {
|
||||
Info(msg string, attributes ...trace.Attribute)
|
||||
Error(err error, attributes ...trace.Attribute)
|
||||
Fatal(msg string, attributes ...trace.Attribute)
|
||||
Debug(msg string, attributes ...trace.Attribute)
|
||||
}
|
||||
|
||||
spanLogger struct {
|
||||
span *trace.Span
|
||||
}
|
||||
|
||||
nopLogger struct{}
|
||||
)
|
||||
|
||||
// For will return a logger for a given context
|
||||
func For(ctx context.Context) Logger {
|
||||
if span := trace.FromContext(ctx); span != nil {
|
||||
return &spanLogger{
|
||||
span: span,
|
||||
}
|
||||
}
|
||||
return new(nopLogger)
|
||||
}
|
||||
|
||||
func (sl spanLogger) Info(msg string, attributes ...trace.Attribute) {
|
||||
sl.logToSpan("info", msg, attributes...)
|
||||
}
|
||||
|
||||
func (sl spanLogger) Error(err error, attributes ...trace.Attribute) {
|
||||
attributes = append(attributes, trace.BoolAttribute("error", true))
|
||||
sl.logToSpan("error", err.Error(), attributes...)
|
||||
}
|
||||
|
||||
func (sl spanLogger) Fatal(msg string, attributes ...trace.Attribute) {
|
||||
attributes = append(attributes, trace.BoolAttribute("error", true))
|
||||
sl.logToSpan("fatal", msg, attributes...)
|
||||
}
|
||||
|
||||
func (sl spanLogger) Debug(msg string, attributes ...trace.Attribute) {
|
||||
sl.logToSpan("debug", msg, attributes...)
|
||||
}
|
||||
|
||||
func (sl spanLogger) logToSpan(level string, msg string, attributes ...trace.Attribute) {
|
||||
attrs := append(attributes, trace.StringAttribute("event", msg), trace.StringAttribute("level", level))
|
||||
sl.span.AddAttributes(attrs...)
|
||||
}
|
||||
|
||||
func (sl nopLogger) Info(msg string, attributes ...trace.Attribute) {}
|
||||
func (sl nopLogger) Error(err error, attributes ...trace.Attribute) {}
|
||||
func (sl nopLogger) Fatal(msg string, attributes ...trace.Attribute) {}
|
||||
func (sl nopLogger) Debug(msg string, attributes ...trace.Attribute) {}
|
|
@ -1,69 +0,0 @@
|
|||
package persist
|
||||
|
||||
// MIT License
|
||||
//
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
// of this software and associated documentation files (the "Software"), to deal
|
||||
// in the Software without restriction, including without limitation the rights
|
||||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
// copies of the Software, and to permit persons to whom the Software is
|
||||
// furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
// SOFTWARE
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// StartOfStream is a constant defined to represent the start of a partition stream in EventHub.
|
||||
StartOfStream = "-1"
|
||||
|
||||
// EndOfStream is a constant defined to represent the current end of a partition stream in EventHub.
|
||||
// This can be used as an offset argument in receiver creation to start receiving from the latest
|
||||
// event, instead of a specific offset or point in time.
|
||||
EndOfStream = "@latest"
|
||||
)
|
||||
|
||||
type (
|
||||
// Checkpoint is the information needed to determine the last message processed
|
||||
Checkpoint struct {
|
||||
Offset string `json:"offset"`
|
||||
SequenceNumber int64 `json:"sequenceNumber"`
|
||||
EnqueueTime time.Time `json:"enqueueTime"`
|
||||
}
|
||||
)
|
||||
|
||||
// NewCheckpointFromStartOfStream returns a checkpoint for the start of the stream
|
||||
func NewCheckpointFromStartOfStream() Checkpoint {
|
||||
return Checkpoint{
|
||||
Offset: StartOfStream,
|
||||
}
|
||||
}
|
||||
|
||||
// NewCheckpointFromEndOfStream returns a checkpoint for the end of the stream
|
||||
func NewCheckpointFromEndOfStream() Checkpoint {
|
||||
return Checkpoint{
|
||||
Offset: EndOfStream,
|
||||
}
|
||||
}
|
||||
|
||||
// NewCheckpoint contains the information needed to checkpoint Event Hub progress
|
||||
func NewCheckpoint(offset string, sequence int64, enqueueTime time.Time) Checkpoint {
|
||||
return Checkpoint{
|
||||
Offset: offset,
|
||||
SequenceNumber: sequence,
|
||||
EnqueueTime: enqueueTime,
|
||||
}
|
||||
}
|
100
persist/file.go
100
persist/file.go
|
@ -1,100 +0,0 @@
|
|||
package persist
|
||||
|
||||
// MIT License
|
||||
//
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
// of this software and associated documentation files (the "Software"), to deal
|
||||
// in the Software without restriction, including without limitation the rights
|
||||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
// copies of the Software, and to permit persons to whom the Software is
|
||||
// furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
// SOFTWARE
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type (
|
||||
// FilePersister implements CheckpointPersister for saving to the file system
|
||||
FilePersister struct {
|
||||
directory string
|
||||
mu sync.Mutex
|
||||
}
|
||||
)
|
||||
|
||||
// NewFilePersister creates a FilePersister for saving to a given directory
|
||||
func NewFilePersister(directory string) (*FilePersister, error) {
|
||||
err := os.MkdirAll(directory, 0777)
|
||||
return &FilePersister{
|
||||
directory: directory,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (fp *FilePersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error {
|
||||
fp.mu.Lock()
|
||||
defer fp.mu.Unlock()
|
||||
|
||||
key := getFilePath(namespace, name, consumerGroup, partitionID)
|
||||
filePath := path.Join(fp.directory, key)
|
||||
bits, err := json.Marshal(checkpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = file.Write(bits)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return file.Close()
|
||||
}
|
||||
|
||||
func (fp *FilePersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) {
|
||||
fp.mu.Lock()
|
||||
defer fp.mu.Unlock()
|
||||
|
||||
key := getFilePath(namespace, name, consumerGroup, partitionID)
|
||||
filePath := path.Join(fp.directory, key)
|
||||
|
||||
f, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return NewCheckpointFromStartOfStream(), nil
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
_, err = io.Copy(buf, f)
|
||||
if err != nil {
|
||||
return NewCheckpointFromStartOfStream(), err
|
||||
}
|
||||
|
||||
var checkpoint Checkpoint
|
||||
err = json.Unmarshal(buf.Bytes(), &checkpoint)
|
||||
return checkpoint, err
|
||||
}
|
||||
|
||||
func getFilePath(namespace, name, consumerGroup, partitionID string) string {
|
||||
key := strings.Join([]string{namespace, name, consumerGroup, partitionID}, "_")
|
||||
return strings.Replace(key, "$", "", -1)
|
||||
}
|
|
@ -1,80 +0,0 @@
|
|||
package persist
|
||||
|
||||
// MIT License
|
||||
//
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
// of this software and associated documentation files (the "Software"), to deal
|
||||
// in the Software without restriction, including without limitation the rights
|
||||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
// copies of the Software, and to permit persons to whom the Software is
|
||||
// furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
// SOFTWARE
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var (
|
||||
letterRunes = []rune("abcdefghijklmnopqrstuvwxyz123456789")
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().Unix())
|
||||
}
|
||||
|
||||
func TestFilePersister_Read(t *testing.T) {
|
||||
namespace := "namespace"
|
||||
name := "name"
|
||||
group := "$Default"
|
||||
partitionID := "0"
|
||||
dir := path.Join(os.TempDir(), RandomName("read", 4))
|
||||
persister, err := NewFilePersister(dir)
|
||||
assert.Nil(t, err)
|
||||
ckp, err := persister.Read(namespace, name, group, partitionID)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, NewCheckpointFromStartOfStream(), ckp)
|
||||
}
|
||||
|
||||
func TestFilePersister_Write(t *testing.T) {
|
||||
namespace := "namespace"
|
||||
name := "name"
|
||||
group := "$Default"
|
||||
partitionID := "0"
|
||||
dir := path.Join(os.TempDir(), RandomName("write", 4))
|
||||
persister, err := NewFilePersister(dir)
|
||||
assert.Nil(t, err)
|
||||
ckp := NewCheckpoint("120", 22, time.Now())
|
||||
err = persister.Write(namespace, name, group, partitionID, ckp)
|
||||
assert.Nil(t, err)
|
||||
ckp2, err := persister.Read(namespace, name, group, partitionID)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, ckp.Offset, ckp2.Offset)
|
||||
assert.Equal(t, ckp.SequenceNumber, ckp2.SequenceNumber)
|
||||
}
|
||||
|
||||
// RandomName generates a random Event Hub name
|
||||
func RandomName(prefix string, length int) string {
|
||||
b := make([]rune, length)
|
||||
for i := range b {
|
||||
b[i] = letterRunes[rand.Intn(len(letterRunes))]
|
||||
}
|
||||
return prefix + "-" + string(b)
|
||||
}
|
|
@ -1,81 +0,0 @@
|
|||
// Package persist provides abstract structures for checkpoint persistence.
|
||||
package persist
|
||||
|
||||
// MIT License
|
||||
//
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
// of this software and associated documentation files (the "Software"), to deal
|
||||
// in the Software without restriction, including without limitation the rights
|
||||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
// copies of the Software, and to permit persons to whom the Software is
|
||||
// furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
// SOFTWARE
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type (
|
||||
// CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and
|
||||
// offset so that if a receiver where to be interrupted, it could resume after the last consumed event.
|
||||
CheckpointPersister interface {
|
||||
Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error
|
||||
Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
|
||||
}
|
||||
|
||||
// MemoryPersister is a default implementation of a Hub CheckpointPersister, which will persist offset information in
|
||||
// memory.
|
||||
MemoryPersister struct {
|
||||
values map[string]Checkpoint
|
||||
mu sync.Mutex
|
||||
}
|
||||
)
|
||||
|
||||
// NewMemoryPersister creates a new in-memory storage for checkpoints
|
||||
//
|
||||
// MemoryPersister is only intended to be shared with EventProcessorHosts within the same process. This implementation
|
||||
// is a toy. You should probably use the Azure Storage implementation or any other that provides durable storage for
|
||||
// checkpoints.
|
||||
func NewMemoryPersister() *MemoryPersister {
|
||||
return &MemoryPersister{
|
||||
values: make(map[string]Checkpoint),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *MemoryPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
key := getPersistenceKey(namespace, name, consumerGroup, partitionID)
|
||||
p.values[key] = checkpoint
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *MemoryPersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
key := getPersistenceKey(namespace, name, consumerGroup, partitionID)
|
||||
if offset, ok := p.values[key]; ok {
|
||||
return offset, nil
|
||||
}
|
||||
return NewCheckpointFromStartOfStream(), fmt.Errorf("could not read the offset for the key %s", key)
|
||||
}
|
||||
|
||||
func getPersistenceKey(namespace, name, consumerGroup, partitionID string) string {
|
||||
return path.Join(namespace, name, consumerGroup, partitionID)
|
||||
}
|
47
rpc/rpc.go
47
rpc/rpc.go
|
@ -31,11 +31,12 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go"
|
||||
"github.com/Azure/azure-amqp-common-go/internal/tracing"
|
||||
"github.com/Azure/azure-amqp-common-go/log"
|
||||
"github.com/Azure/azure-amqp-common-go/uuid"
|
||||
"github.com/devigned/tab"
|
||||
"pack.ag/amqp"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/v2"
|
||||
"github.com/Azure/azure-amqp-common-go/v2/internal/tracing"
|
||||
"github.com/Azure/azure-amqp-common-go/v2/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -70,12 +71,11 @@ func NewLink(conn *amqp.Client, address string) (*Link, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return NewLinkWithSession(conn, authSession, address)
|
||||
return NewLinkWithSession(authSession, address)
|
||||
}
|
||||
|
||||
// NewLinkWithSession will build a new request response link, but will reuse an existing AMQP session
|
||||
func NewLinkWithSession(conn *amqp.Client, session *amqp.Session, address string) (*Link, error) {
|
||||
|
||||
func NewLinkWithSession(session *amqp.Session, address string) (*Link, error) {
|
||||
authSender, err := session.NewSender(
|
||||
amqp.LinkTargetAddress(address),
|
||||
)
|
||||
|
@ -109,30 +109,30 @@ func NewLinkWithSession(conn *amqp.Client, session *amqp.Session, address string
|
|||
|
||||
// RetryableRPC attempts to retry a request a number of times with delay
|
||||
func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration, msg *amqp.Message) (*Response, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC")
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC")
|
||||
defer span.End()
|
||||
|
||||
res, err := common.Retry(times, delay, func() (interface{}, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC.retry")
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC.retry")
|
||||
defer span.End()
|
||||
|
||||
res, err := l.RPC(ctx, msg)
|
||||
if err != nil {
|
||||
log.For(ctx).Error(fmt.Errorf("error in RPC via link %s: %v", l.id, err))
|
||||
tab.For(ctx).Error(fmt.Errorf("error in RPC via link %s: %v", l.id, err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch {
|
||||
case res.Code >= 200 && res.Code < 300:
|
||||
log.For(ctx).Debug(fmt.Sprintf("successful rpc on link %s: status code %d and description: %s", l.id, res.Code, res.Description))
|
||||
tab.For(ctx).Debug(fmt.Sprintf("successful rpc on link %s: status code %d and description: %s", l.id, res.Code, res.Description))
|
||||
return res, nil
|
||||
case res.Code >= 500:
|
||||
errMessage := fmt.Sprintf("server error link %s: status code %d and description: %s", l.id, res.Code, res.Description)
|
||||
log.For(ctx).Error(errors.New(errMessage))
|
||||
tab.For(ctx).Error(errors.New(errMessage))
|
||||
return nil, common.Retryable(errMessage)
|
||||
default:
|
||||
errMessage := fmt.Sprintf("unhandled error link %s: status code %d and description: %s", l.id, res.Code, res.Description)
|
||||
log.For(ctx).Error(errors.New(errMessage))
|
||||
tab.For(ctx).Error(errors.New(errMessage))
|
||||
return nil, common.Retryable(errMessage)
|
||||
}
|
||||
})
|
||||
|
@ -149,7 +149,7 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
|
|||
l.rpcMu.Lock()
|
||||
defer l.rpcMu.Unlock()
|
||||
|
||||
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RPC")
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RPC")
|
||||
defer span.End()
|
||||
|
||||
if msg.Properties == nil {
|
||||
|
@ -205,17 +205,22 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
|
|||
}
|
||||
}
|
||||
|
||||
res.Accept()
|
||||
return &Response{
|
||||
response := &Response{
|
||||
Code: int(statusCode),
|
||||
Description: description,
|
||||
Message: res,
|
||||
}, err
|
||||
}
|
||||
|
||||
if err := res.Accept(); err != nil {
|
||||
return response, err
|
||||
}
|
||||
|
||||
return response, err
|
||||
}
|
||||
|
||||
// Close the link receiver, sender and session
|
||||
func (l *Link) Close(ctx context.Context) error {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.Close")
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.Close")
|
||||
defer span.End()
|
||||
|
||||
if err := l.closeReceiver(ctx); err != nil {
|
||||
|
@ -233,7 +238,7 @@ func (l *Link) Close(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (l *Link) closeReceiver(ctx context.Context) error {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeReceiver")
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeReceiver")
|
||||
defer span.End()
|
||||
|
||||
if l.receiver != nil {
|
||||
|
@ -243,7 +248,7 @@ func (l *Link) closeReceiver(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (l *Link) closeSender(ctx context.Context) error {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSender")
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSender")
|
||||
defer span.End()
|
||||
|
||||
if l.sender != nil {
|
||||
|
@ -253,7 +258,7 @@ func (l *Link) closeSender(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (l *Link) closeSession(ctx context.Context) error {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSession")
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSession")
|
||||
defer span.End()
|
||||
|
||||
if l.session != nil {
|
||||
|
|
|
@ -36,8 +36,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-amqp-common-go/auth"
|
||||
"github.com/Azure/azure-amqp-common-go/conn"
|
||||
"github.com/Azure/azure-amqp-common-go/v2/auth"
|
||||
"github.com/Azure/azure-amqp-common-go/v2/conn"
|
||||
)
|
||||
|
||||
type (
|
||||
|
|
Загрузка…
Ссылка в новой задаче