Merge pull request #83 from Azure/sender

Fix sender recovery on temporary network errors and config lease persistence
This commit is contained in:
David Justice 2018-12-12 14:08:32 -08:00 коммит произвёл GitHub
Родитель a7f5b65899 bcde45eee7
Коммит 36f07d03d0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 351 добавлений и 185 удалений

10
.gitignore поставляемый
Просмотреть файл

@ -17,3 +17,13 @@ vendor
.idea
.DS_Store
.env
# Test Infrastructure
terraform.tfvars
*.auto.tfvars
*.tfstate
*.tfstate.backup
.terraform/
.terraform.tfstate.lock.info

Просмотреть файл

@ -12,22 +12,24 @@ before_install:
- cd ${TRAVIS_BUILD_DIR}
jobs:
include:
- stage: integration tests
if: type IN (push, cron)
env:
- secure: epA3y311AWEIyCYnap2c9weBnnKw8y5i12ETD/OrvYQxrDeFKf+Y8knKtUYxTWa60qGVy04jeO1DNPJHEezU3cwn9FQK/8Gply8Aoh8/OdD/gMjGNRKsJR/bPcbVj9RvfEoX/gzlgTydgC2/o7cUlEUZPx0NbiTqhGbozYMe7ET+3bElsbkPtVl7oBucUohtHK+AlK6RHYQjp1Rps8DXj0JptjxNLdWu6yRNFd5MzLWpIHVtpcwbAbK9PqQ2Mh7pKiwRUrdfUN9d+voOSZmZs6xyw3WmjN6YWp34k+53ebxE3dxEJlbo3NYdNB+BgmzS7TMIezm6AEYxAhsNGabf/b+KMmcncwuAc9XEJ2TTdWbFXygHPzNMB6awBeFQRM2BCgBN5oTu+RDaQDyx+gswd8sV6jittKZkCPZHBxWH8Y8AmLUKdaZO+WD9dPN44g2hlbNBshPdTORvQvOjH6yhbJsHs/CZn1va9D+LwL+H3IBl1ZY413gja1UqoSoQSCU0wWbKQSHG4ZxLzYqC27Xrcc+tLZD9fyJHUlqlcwR8OLnrno+WsuetxeIIxuJuXrCO6boYMFpgAlJfulMG0tXPSptVXVDhh3RsnK23MYke05RyuRnH3dGpyOJRNjjPYCdagmJPp+XPB2LrlUEhU0zFMxBJXIwhuBAUttx/mNpgRVA=
- secure: iJhPsOMf7UaNQE0bFwWya8kAPwPAoO5tra0jtWEBnME50AJmIP9oLHYDCdMwC8wQb+DqiTNz/auLFtcLt6yqKv+QEu6Ikw0jyaYFVi2bSfmUn6dbQ2Wqd8a4+uVcWNRcAB7eiurn0A8/DhPH/7AVPJlh+kxoX92FusfX3ZCdu29gnu7t9j2YndeY/rXSLwLKkbY2ympt/Vk7rLPjTKVTNK/XMrtSrmWq3wlpYzx498bcefcn1YpgovAeu85nlot6TkgrshYKUMLjxhb10lPhgUCgaW1gnELuem9i3zP8aV3GiP/wlPvtJ+Ts2Ciqn4PTvzVM/ZpFCxJnhLExrQwnVCdcV4jVUhFmrnbgczaPkzWFgNTl/dl32SIxbA5cjW+SJt7F+UkpdBMHWz59zVrTmYrkpIE9W82PS27mkCXdNkPjO03zjXcFb4IvePbIGuCgqhgcHdq1V6TOX5Mw7yFEP8r+jVDJp7xYB3yc3e6gsGNF3fjFC9jASRUAzaRG8w8oO4neDYpCB1np6qmQmNyDXjoRB+B6dDQN24O9UbXdWz5Pu9jvE47RqwozaftugUmgW/uikTs81rqFRV9hck9FOfaGODO6Bppf0L5jSsk9fvUGXvYeBZvq4ipZHOBvqUFRjblCVV0EVewAhlwc9Z4z42sXFJCCLz4zDCYOFLsYWkc=
- secure: A0EyBrCJYGHnCAfzzAupoFz2K96rHB4aJmq3tQZ/W1nllxh0eW0jNEXyAGZG13NLxACw9U7oxM1FAuKTZoKQ9XxeJ0D7jMQjtVBWoDQlB1prK8L+NkHNWCt5KkJ3vtx3Wn9GDWmxCnPyKi+FkaartPPuR+12oAawCWyX/LRg0eHMTTgvb6xtGEvUAdkF4ocO4I9vFsiJLu5vXaavgGCUJJVelIXV8zkXpLE//4Fm1DOuiJGB+EMhpEsACgW++DpTAtVqmFaCh12c4vWsvC0JyrpFe2Oe6Q1nbrEQtNxHMTRCJht3VBKYcuLdxq+az2xZUizU9YlCuOgxsqTxw0YXehMlIIl8MdIGWK1PccGouvTCB5Qzsv6MzF5LMp/GvechRDLwMd5xWhqXWhmHJfmn1XXCUv6RwrX9RSfrclWTUQMCknNWkP2GpSnkJqTvZ8CyiZMyahiJZC5OhnM/vYYl5FSACxs5vYaMZvz7hnr8YktwOGRK60RSD+SXnef+S3D6rLtTfkuZSUsK1gUjLziz3Bviz3JXOTKLH3HN0BWuQ0jawS2DnbvV99OYLOjjklGP8xU2FWGMjQiwX9arux8jsNU1Xss+F2yB2ElVuh+L93kn/WviD+Au0GNH4OdXVa8sXBXi+L7BMKi++hbcV+4hl1hhaglmAS9AtSHrlggi6aQ=
- secure: piCzL83qx3K9TizHqrOzgJCgbIpV2yhOYaYzCF9SXdeKTUIEPYi2jsbEJ5AzaTggyeXd/qqqfIeMuWHyIuWQtN+eTk9XeHR+s8QCeLAe1vvygq4NZcIGs5i2/9BYVkTpTR4kTnUut5FTUcthTFLI8fxpSAqWEE0JCj6eHKO9I6RFgpmN2HI6nihu9Wsu6PKBD8z8n1/Zj8x+iDtdTSlGTVPWEypfYzPzi+0tNEl5MpxV08niQZqKE0yIlbEXp5/fAMsOP3zlv8z9t52A11zmfvfXY22YdC3yw7ZKsPQ2QFUzNKkEBEJDkMXBEYMW7SgWMtc3GnBI98NJbJ83OkrebTCigcy9QBT7xYEcxqKknpdnT5yRpfJq8KrtrvbA6eCBPDUnYkMK5Vch1sUSDfvmBXEBG66ADbXVrKZCZMZBHs8p0QJon1wZIN7xMRYURhhT3gjuohgeNYyfLUuRq1uOZbQXcEZMgNrTfKzPuTJ3sRYxQB1XNgSVB9ByCGhx9W2/uKuJ9b2Gznab8oR3QX0lLOBfWiko05p4KHfEvIdGezD+pKZpma1GGKtNuC7NQf1mp+EVsh2+ULUL9QmsZgZmDL3AbR2XjFWjR9OdBFLNcsV3kxu2bcJSX4plOU1coZkh2jAYlEmbwcdLbKVc4Tc5fvjnIQUUvpZh+r6GnR+/u0s=
- secure: L2Yesk9kHmvgEVQ8T4g7g0sKC+7daKZvyuzM9ubHUeKgAho+fXz9Y8QBYugmh02K79luzRBsL8Qa4bisrbF/JUr6psNbSpgAhNHvOto8Iw+0L8MlFFUw2MSkuxdK9DnWZEMdKt7D4y6q3Px8aSi+eHepnK+ZvGys1SDC2eXfr1hDyLplCIBJXZYNDLw3dUwU3wk4zejSVxeYYwDxu9JjidzQ3vgsJ9XD+7UjnN2t8x8XirnmcNWvCh2fbMdpXsVsdp+Pf6JaC9y/L9yUKPmm25t2bvtbRgWxA2pLlGdLXjmYMlZR940PCHZEZi+TIdJ1dHQIkLDHQMjwQzB10FGNUOPYRZK3dWiPoem4uTIMM8ORrL9C2E/pwPYAUKzaIfJXs03MQtiu51yEcPsOsXwBnsmaBcvZV8pbtmBQOlDzLPNNd8LiEFBEd2Br2VxUJvDwwoyviEQoBigcU7uyi+AcX6EsZ+uGh/TjEcc4JzwIehj/hTyrDmVF1j/69IH37oPAqabCDfkWjISycPPeMmTwldivH25w9E/HvS24ZEdw9or5ES/u4Tam7Kbfm4eGAUS+GX1scEtLrLt/wcm2bTIV87FTIIgPKVTFI6OaOcV4BCmUq3NrnfG5Dko80zMJ0wB+2S5RRq8UMK2LSuuQms7cImtfZrNpa/n9d6UUYblkYsA=
- secure: jCpD5FXTHW/lUyrtrT+744aBq5mj15tfBhtBuwbFVIWdGh5XfKH+Ee+f8ErTtPq7f0j5yzgdz3jo6sgYG4zQctc2N5Cmg10HltSwCNd28TK/V6ySqcjk5YEpjgoQH216hPn3WjiZsnT7gAEVOFV/T7Op+Zoh+JQ6unW4IvJycxhrWKrXfmU4RIr7ul/Sk/KiGrCjdoSElyVnxJMH1jgiUONXBEybTR8uduxASkHmgbBU69xkUzgPJDc9x2juPI5bes69r2fADmtC8+X9J6S0GQq9FLb0jaSVF8iYU0eXOd2bcvjzsPqJz142GxtNTHbbVsHWLj3qNamxTHkQ016wMsCihTstIMUoexxxxrffKidCVME0sWUlCrnnmkcSxUWjJwF6Q6D5yajQy9ZiA0azy5zYwq5Oe827659vFGXxnoKf1d0QqoEyzUDiBXC6PqxoMgDo8zdd6PWWiUJtB/CwgYNrp/9yBb8yU1znBP7jq5f6oaufzSFPYvGlXyFMbOmm7ATaJswKPJy+QjNkAlj81uizbjMOdOE2bkVC8Hbu2Im0F/OEgRroE7WRefJjIBO36Zg/DyUFrCRfpg2yHTog17Y18BemXgtga0bfZkPk3PX6jE6D5JU976gWDmTfcGfduUn62tznxvHMQbfwPZVj4orQLtTk17ZGsszLviEeLHY=
- secure: XQnN/O10kWHr6Z3Ixp2zlBmbR8hZ058X7oV5CKBnyeQYsY4/KTJnotwOGkHbfOZT2BCfgQvf8YqwNOoALwm5aWS2Mwzo5yJTHkthK7rP5/zRgNGczavNGAoN6sJcHnuChdaTNsHfNRa+Oo7OwoQitb0drFTE2oLUC6UbsXhf/0Gqew+YtCjh9SDDupRDiM6OqZKExenIFMeFGyST42br2xYyGZkyNtzY1WRV0mdznmBe+MFRU72HlvIF+82nODcFtonjQjwHnWaXinrb2tGQZGX0NwFJU5fap/VWYMa2D5nXmPloitpkIOTq/p5CoHKVEi605fIy0Xd/j2iiSpOYX5se6ctXInub+x/ZlVpuLp15bplfGDhxSRWi1gaiAIPSuZVMx8lo9hivJ0c2VVQT3aNy+AuyURAwFCY8E9jXfy9SLfmsV1wCEqaUARyT1Cg/E1U0qFOBUxnKG5d262VaPk/tmd1lGih0HslNKicPlNSUlAVIe77+cGEbxCvts2Ayl6s34xJGvecGOk+crXdLCof0YMkRW4vZbqr8xGJVu+CRSyjtkBla13vgp6xxXASw61/6XBT5+6kEtUSbsy/JSNaHh9RxVd9qgizasU2xOhGVOHb6csVz0pqNXqE5SRDEqbYIkS2X+YTRFGcEnukLGzPwLZqT64a7MFwA02DoWJM=
- secure: JiNM+YXuuTKYKvSL2kvsNLngFxxYaHD7Nr4K7SBDrrbP5+ILzs74OQoNdLCr14sm5qGcA1OizGEaxwWKJXqeXZ+BQ4eJPuTp2CAXa6PFmrv/TimWV9XwmhOeVsEoq5RjScPJh5VZ8bfMsEM3bEUNIU+8Jesyz7GoiK4F4hfgy1E5uhqPq1ZIG3NfCNOgo27te+0CO1vnmUs0wEIdQWCafO/Y2T5lqrVlUaGI+eQzL0d8ohK1Y+Y5ZisePz4qgQ3umDXF/9AKl5HqHosCUY4pEXYgBSmnbldIjkabB/0983MetFcsvtGWEYoQD8EvHw+i43ppd2bIm9Q1Mep2mtp8aZsF5Tg2WOcWM9zekiJ/QipHMBMkpTGK15Tbjw/7YGLcyUVTK7aaK7KbrTl504wO+J6g77KFBEDYhDChAMpRaNfCIDnlkwNpsGNQSDLOaVgDS2b+UT4OwvFGwnB8/ytJSFHmyWMN7atB/6hCpLTUm60Foip0buXQmONCDUsj3RNy4X9DjrMHOkcQMXDjtu6HDRP+zkcbWBadUJfM7fxw8LZtBY0luyUzLCm67fuD1jYiJHrt5zwg4A4XsmzPinfUsu1jIOizQv81ww2+OM3j/bxGC1/oj5kbhyD8n57Ltdt7YzzRevXGowaiYWm6ySqrRKNxRX4y2sISvNM7IWmrqcs=
script:
- export GO111MODULE=on
- make test-cover
- goveralls -coverprofile=cover.out -service=travis-ci
- stage: integration tests
if: type IN (push, cron)
env:
- TF_VAR_resource_group_name=eventhub-eh-travis
- secure: IWLgsNBMMDBjJSLRixla9jqiTm3xeERnPiECt+PdjUH3ZevyLOR1tX1Q2V6WS9afFsoddM5QpvqXF50UpNzD2WwnE492syL4x0rHfMhV4aZcN82zrMuggYAenLINfBYcryciGgTtPK767vdU3sbqNl7Nkz0lGd/SYqvgCCd9djMZfG5OccuMm1m/v7VCf0BwJgbk6wH0+J+YocFPpROkvIfPiclJIB+o1ejp8odYsbV2ZO144i7AM18oeEt13iBKz8kIXEf2/xZZ7nZu33VYqpp9K7gRc83ORDZSaKBb+SwtnnaXBrcAKayeJCfsumkPh94eFFqogfOiC2iZg07B1ojQir+n2bgPpBO6Fa4G+A1i7sVotYTmzmBNX7I8K4+0F5U96srtrFWHO13X/2OKxTNRBv2w6rf5Sz8AD7mnuLhAHxXTXdlYAiQ8CaKpUIPhzJBF9A/Ax3fPbSLAvUBGcP/9uTV3NvGQynVaSF0D4DzI3X3hRjuTso14b0EYY5wuBaikenILH+POoTaGSu0/VHQwBvx47K3/GjOX9vt0yOsqFO08uYL5Vho+BSEHy+ZiifpZrXsg9r+OBcxciQv6kLIDC2j13qptFK0liqknMxjB0YSpVqQGRxs5E4Ld5iKGtxDgx2A/VFxsUa86V1wkRlPUvh0BRXwJaYhEJ3GwsDQ=
- secure: q2/2LT3wVgWbjdEBi6hl1kqH+81GchibIyZNObAIEywgA2g33YzvosqvKoPaOHIGbuv9x3bOO4koHH1P/juwAzO9VElyf5XiRiB6ztjykzFYWfVTpZewngZjEP/qpMhh4akODpGjqXahfy3anED2quZ0jjNNQkVY5WvLFbDRW7RCg/sW6AjjLldYFCclxVLyDgIOjWG/vGnu0Qrn9incUiHLEwqm0DR2iiOEs/8jXYXd7LTQVPGnq9jWX4m+9jfgNzEi9Bqns/bB5yr4JFl41Bm+8p8+kjCk2mq2wE5Q2n+QwVgy4uq2GEdyLF/UKglLA5T4bwBByNlwENDnKpDVgdjEwOzeaoLLSSFnWLcfVuDkH6ZS7EYQKU7Q8lszeqKI+rBS37mp2BUR7A/csNzN5tTzLWoRWnd1j66J6y7bzlg5jPkLwndV2qoStssT78kLyZw7jAg5S/+B3iujQS9GaicJTc+h7Or1QS9c/sd4t8VtsDDK+kildSpvvfSRNbtQzCMhwcPEVc7f5LTGSz82mimejjp4j6Pr3iZDUgVPMZi/npLVeZyXSovuvE+gdzssdH+cyjDkx8YYVhI1grefxKrHRKXaqcMP8J3VGFlGOCPQwBTNj1VSWtCdDsj51O6kIaqqPQ1Rcbf8wQ7e0n5cO1WOECkLkgGYkY0/B439ZYg=
- secure: f+xDBxiUHEvQlMid5EID640omZYFzOrr6AAw2pNJ8fv9cMoaFw1mO07KmA4zrA+mzeE9ixLhPbbGvK0/WtJ3G4DHOLyYUJHFd4eKzhhTpWCNz8c3U57qY4mHJpiYEAEgraSAUAsP78brJVolvdZnxbdVuyW7xp7t9ckOUupevKQx1FlOPfeCiaMI852DZwoGckli23nzCl9If8voEG6fDjnJktwrGQfxrKu2CPAlpWUydghTnabhHVovwExYm7lLQY8Z7VjoTydfV2BQBuGsTr6/6PtUlMjFJbxK7ayR2mk565pzLjZmVUwBYRfbTbjWd+LFtQSLoVEI5RoHc9n48fHB7z68nRkkQ2/LQCeYnrj5RCaPgdfM81qbR0RT0sUm/lyqPNUuimPwkjQLEY7DCAkCGFoI1zPkM1ttQcClMmq+hQY5PGHREccLAyhhbiyGDzlw1UgVOoiwT4uxuzegliPsaGCmPaOMZxVOnh3QSjtWVkXukixPWmKRF7NyvRdZkTS7Uf3/SNcOTRYA32X72jXRUvv6mh6igKJgm3o5exXaf+kG1KKTxmNXjKYcRpQd6jGKa9ahM1Efw+G9RD+e7EDSIm/nr2b6Hu/m+KxDLgwxv3okrNXlLBP6CJK0nPeXRGcNqhNhTbT26Ui7l9cIxggrk5fGmNs8AuCMZL76KTU=
- secure: XqkiUC+7iRfR68FuRcGfKy4pa7HaKermLZoDeC657eUGtCgZyvItM4sbQQOkKa7sP7OPJKD7dz1mFFQvuzrJKlgP3JrMJ+IBaYQ1nq0qt4RBQ0LP0s9/0MUkmMmCAXANdHXsWN9ecIIeJNK0RC/zZEHAwxFG0CO4uM0smvimAbc6sLLewADdU5SRwp0/T6IqTCU+EEbkw/RlSttggDxGrxEjBcsoSfouRTCUKPsy/68a+JF5ONMmiS9uHko/FRF+Oyfhd1uORPVCJdyEkGM/zqO3s0vC9iSQLLYRdT+g1eHaa+/DrK6PXQtKoh9khijNbZn1q+1AQqlJhG1k8I2PS77YPasYaB1zUbFtD38pcSLsDk+Y/bN0VX9D1enN6yp3oIt5+6DnlZS9xfhArttZqEqo8axB9zbpSxILvnIT0eosqbL+AAL74dWiL6+4vbmmEf7PAI1xuCAZUKoAxeQPyHuzvqKlJJ63wHAYMNXfRVdLfAr/kDCcoXXr8xgFq9E0m8hAkCgiGTuXf7hCLczjkjYh0xbkUDuguLGCSCDy/Q39toeTZmSVkJs18E+v3j1/pQXeFJpsMHUBZteNOY+BP02CNlK5Zi+WfGNx5y+38JKd3c7Eykx5Lrg8pDlsuE+b5xJaPV3sGFgktI1dPH4obk7RWKTqvEN/IyFwhOsdcJE=
script:
- curl -sLo /tmp/terraform.zip https://releases.hashicorp.com/terraform/0.11.10/terraform_0.11.10_linux_amd64.zip
- unzip /tmp/terraform.zip -d /tmp
- mkdir -p ~/bin
- mv /tmp/terraform ~/bin
- export PATH="~/bin:$PATH"
- export GO111MODULE=on
- make test-cover
- goveralls -coverprofile=cover.out -service=travis-ci
- make destroy
script:
- export GO111MODULE=on
- make
- make

Просмотреть файл

@ -15,7 +15,7 @@ GOSTATICCHECK = $(BIN)/staticcheck
V = 0
Q = $(if $(filter 1,$V),,@)
M = $(shell printf "\033[34;1m▶\033[0m")
TIMEOUT = 360
TIMEOUT = 720
.PHONY: all
all: fmt lint vet tidy build
@ -35,8 +35,8 @@ test-race: ARGS=-race ## Run tests with race detector
test-cover: ARGS=-cover -coverprofile=cover.out -v ## Run tests in verbose mode with coverage
$(TEST_TARGETS): NAME=$(MAKECMDGOALS:test-%=%)
$(TEST_TARGETS): test
check test tests: cyclo lint vet ; $(info $(M) running $(NAME:%=% )tests) @ ## Run tests
$Q $(GO) test -timeout $(TIMEOUT)s $(ARGS) ./...
check test tests: cyclo lint vet terraform.tfstate; $(info $(M) running $(NAME:%=% )tests) @ ## Run tests
$(GO) test -timeout $(TIMEOUT)s $(ARGS) ./...
.PHONY: vet
vet: ; $(info $(M) running vet) @ ## Run vet
@ -64,6 +64,17 @@ fmt: ; $(info $(M) running gofmt…) @ ## Run gofmt on all source files
cyclo: ; $(info $(M) running gocyclo...) @ ## Run gocyclo on all source files
$Q $(GOCYCLO) -over 19 $$($(GO_FILES))
terraform.tfstate: azuredeploy.tf $(wildcard terraform.tfvars) .terraform ; $(info $(M) running terraform...) @ ## Run terraform to provision infrastructure needed for testing
$Q TF_VAR_azure_client_secret="$${ARM_CLIENT_SECRET}" terraform apply -auto-approve
$Q terraform output > .env
.terraform:
$Q terraform init
.Phony: destroy
destroy: ; $(info $(M) running terraform destroy...)
$(Q) terraform destroy --auto-approve
# Misc
.PHONY: clean

Просмотреть файл

@ -8,8 +8,8 @@ import (
"os"
"time"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-amqp-common-go/aad"
"github.com/Azure/azure-event-hubs-go"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"
@ -29,7 +29,10 @@ func main() {
fmt.Print("Enter text: ")
text, _ := reader.ReadString('\n')
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
hub.Send(ctx, eventhub.NewEventFromString(text))
err := hub.Send(ctx, eventhub.NewEventFromString(text))
if err != nil {
log.Fatal(err)
}
if text == "exit\n" {
break
}
@ -95,4 +98,3 @@ func getEventHubMgmtClient() *mgmt.EventHubsClient {
client.Authorizer = a
return &client
}

146
azuredeploy.tf Normal file
Просмотреть файл

@ -0,0 +1,146 @@
variable "location" {
# eastus support AAD authentication, which at the time of writing this is in preview.
# see: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-role-based-access-control
description = "Azure datacenter to deploy to."
default = "eastus"
}
variable "eventhub_name_prefix" {
description = "Input your unique Azure Service Bus Namespace name"
default = "azureehtests"
}
variable "resource_group_name" {
description = "Resource group to provision test infrastructure in."
default = "eventhub-go-tests"
}
variable "azure_client_secret" {
description = "(Optional) piped in from env var so .env will be updated if there is an existing client secret"
default = "foo"
}
# Data resources used to get SubID and Tennant Info
data "azurerm_client_config" "current" {}
resource "random_string" "name" {
keepers = {
# Generate a new id each time we switch to a new resource group
group_name = "${var.resource_group_name}"
}
length = 8
upper = false
special = false
number = false
}
# Create resource group for all of the things
resource "azurerm_resource_group" "test" {
name = "${var.resource_group_name}"
location = "${var.location}"
}
# Create an Event Hub namespace for testing
resource "azurerm_eventhub_namespace" "test" {
name = "${var.eventhub_name_prefix}-${random_string.name.result}"
location = "${azurerm_resource_group.test.location}"
resource_group_name = "${azurerm_resource_group.test.name}"
sku = "standard"
}
resource "azurerm_storage_account" "test" {
name = "${var.eventhub_name_prefix}${random_string.name.result}"
resource_group_name = "${var.resource_group_name}"
location = "${azurerm_resource_group.test.location}"
account_replication_type = "LRS"
account_tier = "Standard"
}
# Generate a random secret fo the service principal
resource "random_string" "secret" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
length = 32
upper = true
special = true
number = true
}
// Application for AAD authentication
resource "azurerm_azuread_application" "test" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
name = "eventhubstest"
homepage = "https://eventhubstest"
identifier_uris = ["https://eventhubstest"]
reply_urls = ["https://eventhubstest"]
available_to_other_tenants = false
oauth2_allow_implicit_flow = true
}
# Create a service principal, which represents a linkage between the AAD application and the password
resource "azurerm_azuread_service_principal" "test" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
application_id = "${azurerm_azuread_application.test.application_id}"
}
# Create a new service principal password which will be the AZURE_CLIENT_SECRET env var
resource "azurerm_azuread_service_principal_password" "test" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
service_principal_id = "${azurerm_azuread_service_principal.test.id}"
value = "${random_string.secret.result}"
end_date = "2030-01-01T01:02:03Z"
}
# This provides the new AAD application the rights to managed, send and receive from the Event Hubs instance
resource "azurerm_role_assignment" "service_principal_eh" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}/providers/Microsoft.EventHub/namespaces/${azurerm_eventhub_namespace.test.name}"
role_definition_name = "Owner"
principal_id = "${azurerm_azuread_service_principal.test.id}"
}
# This provides the new AAD application the rights to managed the resource group
resource "azurerm_role_assignment" "service_principal_rg" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}"
role_definition_name = "Owner"
principal_id = "${azurerm_azuread_service_principal.test.id}"
}
output "TEST_EVENTHUB_RESOURCE_GROUP" {
value = "${var.resource_group_name}"
}
output "EVENTHUB_CONNECTION_STRING" {
value = "Endpoint=sb://${azurerm_eventhub_namespace.test.name}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${azurerm_eventhub_namespace.test.default_primary_key}"
sensitive = true
}
output "EVENTHUB_NAMESPACE" {
value = "${azurerm_eventhub_namespace.test.name}"
}
output "AZURE_SUBSCRIPTION_ID" {
value = "${data.azurerm_client_config.current.subscription_id}"
}
output "TEST_EVENTHUB_LOCATION" {
value = "${var.location}"
}
output "AZURE_TENANT_ID" {
value = "${data.azurerm_client_config.current.tenant_id}"
}
output "AZURE_CLIENT_ID" {
value = "${element(compact(concat(azurerm_azuread_application.test.*.application_id, list(data.azurerm_client_config.current.client_id))),0)}"
}
output "AZURE_CLIENT_SECRET" {
value = "${element(compact(concat(azurerm_azuread_service_principal_password.test.*.value, list(var.azure_client_secret))),0)}"
sensitive = true
}
output "STORAGE_ACCOUNT_NAME" {
value = "${azurerm_storage_account.test.name}"
}

Просмотреть файл

@ -2,6 +2,9 @@
## `head`
- add receive option to receive from a timestamp
- fix sender recovery on temporary network failures
- add LeasePersistenceInterval to Azure Storage LeaserCheckpointer to allow for customization of persistence interval
duration
## `v1.0.1`
- fix the breaking change from storage; this is not a breaking change for this library

Просмотреть файл

@ -275,7 +275,9 @@ func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID
delete(h.handlers, string(id))
if len(h.handlers) == 0 {
h.Close(ctx)
if err := h.Close(ctx); err != nil {
log.For(ctx).Error(err)
}
}
}

3
go.mod
Просмотреть файл

@ -9,6 +9,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dimchansky/utfbom v1.0.0 // indirect
github.com/fortytw2/leaktest v1.2.0 // indirect
github.com/joho/godotenv v1.3.0
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7
github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/go-homedir v1.0.0 // indirect
@ -21,5 +22,5 @@ require (
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba // indirect
google.golang.org/api v0.0.0-20181018171847-1ee037c97071 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
pack.ag/amqp v0.7.4
pack.ag/amqp v0.10.2
)

6
go.sum
Просмотреть файл

@ -25,6 +25,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe h1:CHRGQ8V7OlCYtwaKPJi3iA7J+YdNKdo8j7nG5IgDhjs=
@ -76,5 +78,5 @@ gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
pack.ag/amqp v0.7.4 h1:SKaYf2EzcgN2bN9GTMCNu+xn9aOxMPYNgeyqYvAEtnM=
pack.ag/amqp v0.7.4/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U=
pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=

Просмотреть файл

@ -283,11 +283,11 @@ func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) {
}
func testSendTooBig(ctx context.Context, t *testing.T, client *Hub, _ string) {
data := make([]byte, 256*1024)
data := make([]byte, 2600*1024)
_, _ = rand.Read(data)
event := NewEvent(data)
err := client.Send(ctx, event)
assert.Error(t, err)
assert.Error(t, err, "encoded message size exceeds max of 1048576")
}
func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionID string) {
@ -448,9 +448,7 @@ func testMultiSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
}
for idx, message := range messages {
if !assert.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))) {
assert.FailNow(t, "unable to send message")
}
require.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx))))
}
for _, partitionID := range partitionIDs {
@ -562,14 +560,17 @@ func testHubPartitionRuntimeInformation(ctx context.Context, t *testing.T, clien
}
func TestEnvironmentalCreation(t *testing.T) {
os.Setenv("EVENTHUB_NAME", "foo")
require.NoError(t, os.Setenv("EVENTHUB_NAME", "foo"))
_, err := NewHubFromEnvironment()
assert.Nil(t, err)
os.Unsetenv("EVENTHUB_NAME")
require.NoError(t, os.Unsetenv("EVENTHUB_NAME"))
}
func (suite *eventHubSuite) newClient(t *testing.T, hubName string, opts ...HubOption) (*Hub, func()) {
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars(), aad.JWTProviderWithAzureEnvironment(&suite.Env))
provider, err := aad.NewJWTProvider(
aad.JWTProviderWithEnvironmentVars(),
aad.JWTProviderWithAzureEnvironment(&suite.Env),
)
if !suite.NoError(err) {
suite.FailNow("unable to make a new JWT provider")
}

Просмотреть файл

@ -39,6 +39,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
"github.com/joho/godotenv"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"go.opencensus.io/exporter/jaeger"
@ -54,23 +55,17 @@ const (
defaultTimeout = 1 * time.Minute
)
const (
// Location is the Azure geographic location the test suite will use for provisioning
Location = "eastus"
// ResourceGroupName is the name of the resource group the test suite will use for provisioning
ResourceGroupName = "ehtest"
)
type (
// BaseSuite encapsulates a end to end test of Event Hubs with build up and tear down of all EH resources
BaseSuite struct {
suite.Suite
SubscriptionID string
Namespace string
Env azure.Environment
TagID string
closer io.Closer
SubscriptionID string
Namespace string
ResourceGroupName string
Location string
Env azure.Environment
TagID string
closer io.Closer
}
// HubMgmtOption represents an option for configuring an Event Hub.
@ -81,6 +76,7 @@ type (
func init() {
rand.Seed(time.Now().Unix())
loadEnv()
}
// SetupSuite constructs the test suite from the environment and
@ -90,8 +86,10 @@ func (suite *BaseSuite) SetupSuite() {
log.SetLevel(log.DebugLevel)
}
suite.SubscriptionID = mustGetEnv("AZURE_SUBSCRIPTION_ID")
suite.Namespace = mustGetEnv("EVENTHUB_NAMESPACE")
suite.SubscriptionID = MustGetEnv("AZURE_SUBSCRIPTION_ID")
suite.Namespace = MustGetEnv("EVENTHUB_NAMESPACE")
suite.ResourceGroupName = MustGetEnv("TEST_EVENTHUB_RESOURCE_GROUP")
suite.Location = MustGetEnv("TEST_EVENTHUB_LOCATION")
envName := os.Getenv("AZURE_ENVIRONMENT")
suite.TagID = RandomString("tag", 5)
@ -122,7 +120,7 @@ func (suite *BaseSuite) TearDownSuite() {
defer cancel()
suite.deleteAllTaggedEventHubs(ctx)
if suite.closer != nil {
suite.closer.Close()
suite.NoError(suite.closer.Close())
}
}
@ -139,10 +137,7 @@ func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) {
suite.Require().Len(*model.PartitionIds, 4)
return model, func() {
if model != nil {
err := suite.DeleteEventHub(*model.Name)
if err != nil {
suite.T().Log(err)
}
suite.DeleteEventHub(*model.Name)
}
}
}
@ -150,7 +145,7 @@ func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) {
// EnsureEventHub creates an Event Hub if it doesn't exist
func (suite *BaseSuite) ensureEventHub(ctx context.Context, name string, opts ...HubMgmtOption) (*mgmt.Model, error) {
client := suite.getEventHubMgmtClient()
hub, err := client.Get(ctx, ResourceGroupName, suite.Namespace, name)
hub, err := client.Get(ctx, suite.ResourceGroupName, suite.Namespace, name)
if err != nil {
newHub := &mgmt.Model{
@ -189,26 +184,26 @@ func (suite *BaseSuite) tryHubCreate(ctx context.Context, client *mgmt.EventHubs
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
_, err := client.CreateOrUpdate(ctx, ResourceGroupName, suite.Namespace, name, *hub)
_, err := client.CreateOrUpdate(ctx, suite.ResourceGroupName, suite.Namespace, name, *hub)
if err != nil {
return mgmt.Model{}, err
}
return client.Get(ctx, ResourceGroupName, suite.Namespace, name)
return client.Get(ctx, suite.ResourceGroupName, suite.Namespace, name)
}
// DeleteEventHub deletes an Event Hub within the given Namespace
func (suite *BaseSuite) DeleteEventHub(name string) error {
func (suite *BaseSuite) DeleteEventHub(name string) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
client := suite.getEventHubMgmtClient()
_, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, name)
return err
_, err := client.Delete(ctx, suite.ResourceGroupName, suite.Namespace, name)
suite.NoError(err)
}
func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
client := suite.getEventHubMgmtClient()
res, err := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace, to.Int32Ptr(0), to.Int32Ptr(20))
res, err := client.ListByNamespace(ctx, suite.ResourceGroupName, suite.Namespace, to.Int32Ptr(0), to.Int32Ptr(20))
if err != nil {
suite.T().Log("error listing namespaces")
suite.T().Error(err)
@ -218,7 +213,7 @@ func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
for _, val := range res.Values() {
if strings.Contains(*val.Name, suite.TagID) {
for i := 0; i < 5; i++ {
if _, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, *val.Name); err != nil {
if _, err := client.Delete(ctx, suite.ResourceGroupName, suite.Namespace, *val.Name); err != nil {
suite.T().Logf("error deleting %q", *val.Name)
suite.T().Error(err)
time.Sleep(3 * time.Second)
@ -230,12 +225,12 @@ func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
suite.T().Logf("%q does not contain %q", *val.Name, suite.TagID)
}
}
res.Next()
suite.NoError(res.Next())
}
}
func (suite *BaseSuite) ensureProvisioned(tier mgmt.SkuTier) error {
_, err := ensureResourceGroup(context.Background(), suite.SubscriptionID, ResourceGroupName, Location, suite.Env)
_, err := ensureResourceGroup(context.Background(), suite.SubscriptionID, suite.ResourceGroupName, suite.Location, suite.Env)
if err != nil {
return err
}
@ -332,7 +327,7 @@ func (suite *BaseSuite) getEventHubMgmtClient() *mgmt.EventHubsClient {
}
func (suite *BaseSuite) ensureNamespace() (*mgmt.EHNamespace, error) {
ns, err := ensureNamespace(context.Background(), suite.SubscriptionID, ResourceGroupName, suite.Namespace, Location, suite.Env)
ns, err := ensureNamespace(context.Background(), suite.SubscriptionID, suite.ResourceGroupName, suite.Namespace, suite.Location, suite.Env)
if err != nil {
return nil, err
}
@ -365,7 +360,9 @@ func (suite *BaseSuite) setupTracing() error {
}
exporter, err := jaeger.NewExporter(jaeger.Options{
AgentEndpoint: "localhost:6831",
ServiceName: "eh-trace",
Process: jaeger.Process{
ServiceName: "eh-tests",
},
})
if err != nil {
return err
@ -374,7 +371,8 @@ func (suite *BaseSuite) setupTracing() error {
return nil
}
func mustGetEnv(key string) string {
// MustGetEnv will panic or return the env var for a given string key
func MustGetEnv(key string) string {
v := os.Getenv(key)
if v == "" {
panic("Env variable '" + key + "' required for integration tests.")
@ -395,3 +393,36 @@ func RandomString(prefix string, length int) string {
}
return prefix + string(b)
}
func loadEnv() {
lookForMe := []string{".env", "../.env", "../../.env"}
var reader io.ReadCloser
for _, env := range lookForMe {
r, err := os.Open(env)
if err == nil {
reader = r
break
}
}
if reader == nil {
log.Fatalf("no .env files were found in %v", lookForMe)
}
defer func() {
if err := reader.Close(); err != nil {
log.Fatal(err)
}
}()
envMap, err := godotenv.Parse(reader)
if err != nil {
log.Fatal(err)
}
for key, val := range envMap {
if err := os.Setenv(key, val); err != nil {
log.Fatal(err)
}
}
}

Просмотреть файл

@ -25,6 +25,7 @@ package eventhub
import (
"context"
"fmt"
"net"
"time"
"github.com/Azure/azure-amqp-common-go/log"
@ -144,7 +145,13 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
return err
}
switch err.(type) {
case *amqp.Error, *amqp.DetachError:
case *amqp.Error, *amqp.DetachError, net.Error:
if netErr, ok := err.(net.Error); ok {
if !netErr.Temporary(){
return netErr
}
}
duration := s.recoveryBackoff.Duration()
log.For(ctx).Debug("amqp error, delaying " + string(duration/time.Millisecond) + " millis: " + err.Error())
time.Sleep(duration)
@ -202,6 +209,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
}
amqpSender, err := amqpSession.NewSender(
amqp.LinkReceiverSettle(amqp.ModeSecond),
amqp.LinkTargetAddress(s.getAddress()),
)
if err != nil {

Просмотреть файл

@ -24,18 +24,13 @@ package storage
import (
"context"
"errors"
"log"
"net/url"
"strings"
"testing"
"github.com/Azure/azure-amqp-common-go"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2017-10-01/storage"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/stretchr/testify/suite"
)
@ -54,23 +49,18 @@ func TestStorage(t *testing.T) {
func (ts *testSuite) SetupSuite() {
ts.BaseSuite.SetupSuite()
ts.AccountName = strings.ToLower(test.RandomString("ehtest", 6))
ts.Require().NoError(ts.ensureStorageAccount())
ts.AccountName = test.MustGetEnv("STORAGE_ACCOUNT_NAME")
}
func (ts *testSuite) TearDownSuite() {
ts.BaseSuite.TearDownSuite()
err := ts.deleteStorageAccount()
if err != nil {
ts.T().Error(err)
}
}
func (ts *testSuite) TestCredential() {
containerName := "foo"
blobName := "bar"
message := "Hello World!!"
tokenProvider, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
tokenProvider, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
if err != nil {
ts.T().Fatal(err)
}
@ -84,7 +74,11 @@ func (ts *testSuite) TestCredential() {
}
containerURL := azblob.NewContainerURL(*fooURL, pipeline)
defer containerURL.Delete(ctx, azblob.ContainerAccessConditions{})
defer func(){
if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil {
log.Fatal(err, res)
}
}()
_, err = containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if err != nil {
ts.T().Error(err)
@ -95,67 +89,4 @@ func (ts *testSuite) TestCredential() {
if err != nil {
ts.T().Error(err)
}
}
func (ts *testSuite) deleteStorageAccount() error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
defer cancel()
client := getStorageAccountMgmtClient(ts.SubscriptionID, ts.Env)
_, err := client.Delete(ctx, test.ResourceGroupName, ts.AccountName)
return err
}
func (ts *testSuite) ensureStorageAccount() error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
defer cancel()
client := getStorageAccountMgmtClient(ts.SubscriptionID, ts.Env)
accounts, err := client.ListByResourceGroup(ctx, test.ResourceGroupName)
if err != nil {
return err
}
if accounts.Response.Response == nil {
return errors.New("response is nil and error is not nil")
}
if accounts.Response.Response != nil && accounts.StatusCode == 404 {
return errors.New("resource group does not exist")
}
for _, account := range *accounts.Value {
if ts.AccountName == *account.Name {
// provisioned, so return
return nil
}
}
res, err := client.Create(ctx, test.ResourceGroupName, ts.AccountName, storage.AccountCreateParameters{
Sku: &storage.Sku{
Name: storage.StandardLRS,
Tier: storage.Standard,
},
Kind: storage.BlobStorage,
Location: common.PtrString(test.Location),
AccountPropertiesCreateParameters: &storage.AccountPropertiesCreateParameters{
AccessTier: storage.Hot,
},
})
if err != nil {
return err
}
return res.WaitForCompletionRef(ctx, client.Client)
}
func getStorageAccountMgmtClient(subscriptionID string, env azure.Environment) *storage.AccountsClient {
client := storage.NewAccountsClientWithBaseURI(env.ResourceManagerEndpoint, subscriptionID)
a, err := azauth.NewAuthorizerFromEnvironment()
if err != nil {
log.Fatal(err)
}
client.Authorizer = a
return &client
}
}

Просмотреть файл

@ -35,7 +35,6 @@ import (
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/eph"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/Azure/azure-storage-blob-go/azblob"
)
@ -55,7 +54,9 @@ func (ts *testSuite) TestSingle() {
ts.Require().NoError(err)
defer func() {
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processor.Close(closeContext)
if err := processor.Close(closeContext); err != nil {
ts.Error(err)
}
cancel()
delHub()
}()
@ -66,12 +67,13 @@ func (ts *testSuite) TestSingle() {
var wg sync.WaitGroup
wg.Add(len(messages))
processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error {
_, err = processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error {
wg.Done()
return nil
})
ts.Require().NoError(err)
processor.StartNonBlocking(ctx)
ts.Require().NoError(processor.StartNonBlocking(ctx))
end, _ := ctx.Deadline()
waitUntil(ts.T(), &wg, time.Until(end))
}
@ -85,7 +87,7 @@ func (ts *testSuite) TestMultiple() {
delContainer := ts.newTestContainerByName(*hub.Name)
defer delContainer()
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, *hub.Name, AADSASCredentialWithEnvironmentVars())
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, *hub.Name, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
numPartitions := len(*hub.PartitionIds)
processors := make(map[string]*eph.EventProcessorHost, numPartitions)
@ -105,7 +107,9 @@ func (ts *testSuite) TestMultiple() {
defer func() {
for _, processor := range processors {
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processor.Close(closeContext)
if err := processor.Close(closeContext); err != nil {
ts.Error(err)
}
cancel()
}
delHub()
@ -140,7 +144,7 @@ func (ts *testSuite) TestMultiple() {
}
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processors[processorNames[numPartitions-1]].Close(closeContext) // close the last partition
ts.Require().NoError(processors[processorNames[numPartitions-1]].Close(closeContext)) // close the last partition
delete(processors, processorNames[numPartitions-1])
cancel()
@ -175,7 +179,7 @@ func (ts *testSuite) newTestContainerByName(containerName string) func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
pipeline := azblob.NewPipeline(cred, azblob.PipelineOptions{})
@ -189,7 +193,9 @@ func (ts *testSuite) newTestContainerByName(containerName string) func() {
return func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
containerURL.Delete(ctx, azblob.ContainerAccessConditions{})
if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil {
ts.NoError(err, res)
}
}
}
@ -197,7 +203,7 @@ func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error)
client := ts.newClient(ts.T(), hubName)
defer func() {
closeContext, cancel := context.WithTimeout(context.Background(), 30*time.Second)
client.Close(closeContext)
ts.NoError(client.Close(closeContext))
cancel()
}()
@ -213,13 +219,13 @@ func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client.SendBatch(ctx, eventhub.NewEventBatch(events))
ts.NoError(client.SendBatch(ctx, eventhub.NewEventBatch(events)))
return messages, ctx.Err()
}
func (ts *testSuite) newStorageBackedEPH(hubName, containerName string) (*eph.EventProcessorHost, error) {
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
leaserCheckpointer, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env)
ts.Require().NoError(err)

Просмотреть файл

@ -29,6 +29,7 @@ import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"net/url"
"sync"
"time"
@ -47,18 +48,20 @@ import (
type (
// LeaserCheckpointer implements the eph.LeaserCheckpointer interface for Azure Storage
LeaserCheckpointer struct {
leases map[string]*storageLease
processor *eph.EventProcessorHost
leaseDuration time.Duration
credential Credential
containerURL *azblob.ContainerURL
serviceURL *azblob.ServiceURL
containerName string
accountName string
env azure.Environment
dirtyPartitions map[string]uuid.UUID
leasesMu sync.Mutex
done func()
// LeasePersistenceInterval is the default period of time which dirty leases will be persisted to Azure Storage
LeasePersistenceInterval time.Duration
leases map[string]*storageLease
processor *eph.EventProcessorHost
leaseDuration time.Duration
credential Credential
containerURL *azblob.ContainerURL
serviceURL *azblob.ServiceURL
containerName string
accountName string
env azure.Environment
dirtyPartitions map[string]uuid.UUID
leasesMu sync.Mutex
done func()
}
storageLease struct {
@ -85,6 +88,10 @@ type (
}
)
const (
defaultLeasePersistenceInterval = 5 * time.Second
)
// NewStorageLeaserCheckpointer builds an Azure Storage Leaser Checkpointer which handles leasing and checkpointing for
// the EventProcessorHost
func NewStorageLeaserCheckpointer(credential Credential, accountName, containerName string, env azure.Environment) (*LeaserCheckpointer, error) {
@ -97,15 +104,16 @@ func NewStorageLeaserCheckpointer(credential Credential, accountName, containerN
containerURL := svURL.NewContainerURL(containerName)
return &LeaserCheckpointer{
credential: credential,
containerName: containerName,
accountName: accountName,
leaseDuration: eph.DefaultLeaseDuration,
env: env,
serviceURL: &svURL,
containerURL: &containerURL,
leases: make(map[string]*storageLease),
dirtyPartitions: make(map[string]uuid.UUID),
credential: credential,
containerName: containerName,
accountName: accountName,
leaseDuration: eph.DefaultLeaseDuration,
env: env,
serviceURL: &svURL,
containerURL: &containerURL,
leases: make(map[string]*storageLease),
dirtyPartitions: make(map[string]uuid.UUID),
LeasePersistenceInterval: defaultLeasePersistenceInterval,
}, nil
}
@ -477,7 +485,7 @@ func (sl *LeaserCheckpointer) persistLeases(ctx context.Context) {
if err != nil {
log.For(ctx).Error(err)
}
<-time.After(1 * time.Second)
<-time.After(sl.LeasePersistenceInterval)
}
}
}
@ -596,10 +604,13 @@ func (sl *LeaserCheckpointer) getLease(ctx context.Context, partitionID string)
}
func (sl *LeaserCheckpointer) leaseFromResponse(res *azblob.DownloadResponse) (*storageLease, error) {
buf := new(bytes.Buffer)
buf.ReadFrom(res.Response().Body)
b, err := ioutil.ReadAll(res.Response().Body)
if err != nil {
return nil, err
}
var lease storageLease
if err := json.Unmarshal(buf.Bytes(), &lease); err != nil {
if err := json.Unmarshal(b, &lease); err != nil {
return nil, err
}
lease.leaser = sl

Просмотреть файл

@ -29,7 +29,6 @@ import (
"github.com/Azure/azure-amqp-common-go/aad"
"github.com/Azure/azure-event-hubs-go/eph"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/stretchr/testify/assert"
)
@ -189,7 +188,7 @@ func (ts *testSuite) leaserWithEPH() (*LeaserCheckpointer, func()) {
func (ts *testSuite) newLeaser() (*LeaserCheckpointer, func()) {
containerName := strings.ToLower(ts.RandomName("stortest", 4))
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
leaser, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env)
ts.Require().NoError(err)