This commit is contained in:
Basil Hariri 2018-09-06 11:24:18 -07:00
Родитель 563f3ee784
Коммит c98d0537de
51 изменённых файлов: 2778 добавлений и 46 удалений

148
.gitignore поставляемый
Просмотреть файл

@ -1,3 +1,63 @@
### Eclipse ###
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
.recommenders
# External tool builders
.externalToolBuilders/
# Locally stored "Eclipse launch configurations"
*.launch
# PyDev specific (Python IDE for Eclipse)
*.pydevproject
# CDT-specific (C/C++ Development Tooling)
.cproject
# Java annotation processor (APT)
.factorypath
# PDT-specific (PHP Development Tools)
.buildpath
# sbteclipse plugin
.target
# Tern plugin
.tern-project
# TeXlipse plugin
.texlipse
# STS (Spring Tool Suite)
.springBeans
# Code Recommenders
.recommenders/
# Scala IDE specific (Scala & Java development for Eclipse)
.cache-main
.scala_dependencies
.worksheet
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
### VisualStudio ###
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
##
@ -24,14 +84,11 @@ bld/
[Oo]bj/
[Ll]og/
# Visual Studio 2015/2017 cache/options directory
# Visual Studio 2015 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# Visual Studio 2017 auto generated files
Generated\ Files/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
@ -45,29 +102,20 @@ TestResult.xml
[Rr]eleasePS/
dlldata.c
# Benchmark Results
BenchmarkDotNet.Artifacts/
# .NET Core
project.lock.json
project.fragment.lock.json
artifacts/
**/Properties/launchSettings.json
# StyleCop
StyleCopReport.xml
# Files built by Visual Studio
*_i.c
*_p.c
*_i.h
*.ilk
*.meta
*.obj
*.iobj
*.pch
*.pdb
*.ipdb
*.pgc
*.pgd
*.rsp
@ -75,7 +123,6 @@ StyleCopReport.xml
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*.log
*.vspscc
@ -105,9 +152,6 @@ ipch/
*.vspx
*.sap
# Visual Studio Trace Files
*.e2e
# TFS 2012 Local Workspace
$tf/
@ -128,10 +172,6 @@ _TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# AxoCover is a Code Coverage Tool
.axoCover/*
!.axoCover/settings.json
# Visual Studio code coverage results
*.coverage
*.coveragexml
@ -167,7 +207,7 @@ publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# Note: Comment the next line if you want to checkin your web deploy settings,
# TODO: Comment the next line if you want to checkin your web deploy settings
# but database connection strings (with potential passwords) will be unencrypted
*.pubxml
*.publishproj
@ -180,11 +220,11 @@ PublishScripts/
# NuGet Packages
*.nupkg
# The packages folder can be ignored because of Package Restore
**/[Pp]ackages/*
**/packages/*
# except build/, which is used as an MSBuild target.
!**/[Pp]ackages/build/
!**/packages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/[Pp]ackages/repositories.config
#!**/packages/repositories.config
# NuGet v3's project.json files produces more ignorable files
*.nuget.props
*.nuget.targets
@ -202,7 +242,6 @@ AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt
*.appx
# Visual Studio cache files
# files ending in .cache can be ignored
@ -221,10 +260,6 @@ ClientBin/
*.publishsettings
orleans.codegen.cs
# Including strong name files can present a security risk
# (https://github.com/github/gitignore/pull/2483#issue-259490424)
#*.snk
# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/
@ -239,8 +274,6 @@ _UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
ServiceFabricBackup/
*.rptproj.bak
# SQL Server files
*.mdf
@ -251,7 +284,6 @@ ServiceFabricBackup/
*.rdl.data
*.bim.layout
*.bim_*.settings
*.rptproj.rsuser
# Microsoft Fakes
FakesAssemblies/
@ -263,6 +295,9 @@ FakesAssemblies/
.ntvs_analysis.dat
node_modules/
# Typescript v1 declaration files
typings/
# Visual Studio 6 build log
*.plg
@ -302,9 +337,6 @@ __pycache__/
# tools/**
# !tools/packages.config
# Tabs Studio
*.tss
# Telerik's JustMock configuration file
*.jmconfig
@ -314,17 +346,41 @@ __pycache__/
*.odx.cs
*.xsd.cs
# OpenCover UI analysis results
OpenCover/
# Maven
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# Azure Stream Analytics local run output
ASALocalRun/
# Avoid ignoring Maven wrapper jar file (.jar files are usually ignored)
!/.mvn/wrapper/maven-wrapper.jar
# MSBuild Binary and Structured Log
*.binlog
#Java
# Compiled class file
*.class
# NVidia Nsight GPU debugger configuration file
*.nvuser
# Log file
*.log
# MFractors (Xamarin productivity tool) working folder
.mfractor/
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.ear
*.zip
*.tar.gz
*.rar
*.iml
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

14
CONTRIBUTING.md Normal file
Просмотреть файл

@ -0,0 +1,14 @@
# Contributing
This project welcomes contributions and suggestions. Most contributions require you to
agree to a Contributor License Agreement (CLA) declaring that you have the right to,
and actually do, grant us the rights to use your contribution. For details, visit
https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need
to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the
instructions provided by the bot. You will only need to do this once across all repositories using our CLA.
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/)
or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.

22
LICENSE.txt Normal file
Просмотреть файл

@ -0,0 +1,22 @@
MIT License
Azure Event Hubs for Apache Kafka Ecosystems
Copyright (c) Microsoft Corporation. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE

51
README.md Normal file
Просмотреть файл

@ -0,0 +1,51 @@
# Migrating to Azure Event Hubs for Apache Kafka Ecosystems
An Azure Event Hubs Kafka endpoint enables users to connect to Azure Event Hubs using the Kafka protocol. By making minimal changes to a Kafka application, users will be able to connect to Azure Event Hubs and reap the benefits of the Azure ecosystem. Event Hubs for Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
When we built Kafka-enabled Event Hubs, we wanted to give Kafka users the stability, scalability, and support of Event Hubs without sacrificing their ability to connect to the network of Kafka supporting frameworks. With that in mind, we've started rolling out a set of tutorials to show how simple it is to connect Kafka-enabled Event Hubs with various platforms and frameworks. The tutorials in this directory all work right out of the box, but for those of you looking to connect with a framework we haven't covered, this guide will outline the generic steps needed to connect your preexisting Kafka application to an Event Hubs Kafka endpoint.
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
### FQDN
For these samples, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Update your Kafka client configuration
To connect to a Kafka-enabled Event Hub, you'll need to update the Kafka client configs. If you're having trouble finding yours, try searching for where `bootstrap.servers` is set in your application.
Insert the following configs wherever makes sense in your application. Make sure to update the `bootstrap.servers` and `sasl.jaas.config` values to direct the client to your Event Hubs Kafka endpoint with the correct authentication.
```
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
```
If `sasl.jaas.config` is not a supported configuration in your framework, find the configurations that are used to set the SASL username and password and use those instead. Set the username to `$ConnectionString` and the password to your Event Hubs connection string.
## Run your application
Run your application and see how it goes - in most cases this should be enough to make the switch.
## Troubleshooting
Didn't work? In our experience, when changing the configurations didn't go as smoothly as we'd hoped, the issue was usually related to one of the following:
* Getting your framework to cooperate with the SASL authentication protocol required by Event Hubs. See if you can troubleshoot the configuration using your framework's resources on SASL authentication. If you figure it out, let us know and we'll share it with other developers!
* Version issues. Event Hubs for Kafka Ecosystems supports Kafka versions 1.0 and later. Some applications using Kafka version 0.10 and later could work because of the Kafka protocol's backwards compatability, but there's a chance it won't be able to connect or will require some serious tinkering. Since Kafka versions 0.9 and earlier don't support the required SASL protocols, any adapter or client using those versions won't be able to connect to Event Hubs.
If you're still stuck (or if you know the secret to making it work with your framework), let us know by opening up a GitHub issue on this repo!

487
THIRD-PARTY-NOTICES.txt Normal file
Просмотреть файл

@ -0,0 +1,487 @@
This Third Party Notices file provides notices and information about the third party code
or other materials listed below ("Third Party Code") which are included with these samples.
Microsoft reserves all other rights to the Third Party Code not expressly granted by Microsoft,
whether by implication, estoppel or otherwise.
If there are any issues with or omissions from the notices in this file, please let us know by opening up an issue on this repo.
##############################################################################
# librdkafka licenses #
##############################################################################
LICENSE
--------------------------------------------------------------
librdkafka - Apache Kafka C driver library
Copyright (c) 2012, Magnus Edenhill
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
##############################################################################
# Confluent Python license #
##############################################################################
LICENSE
--------------------------------------------------------------
Confluent's Python Client for Apache Kafka
Copyright (c) 2016, Confluent Inc.
All rights reserved.
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2016 Confluent Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
##############################################################################
# Confluent Golang license #
##############################################################################
LICENSE
--------------------------------------------------------------
Confluent's Golang Client for Apache Kafka
Copyright (c) 2016, Confluent Inc.
All rights reserved.
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2016 Confluent Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
##############################################################################
# Blizzard node-rdkafka license #
##############################################################################
LICENSE
--------------------------------------------------------------
node-rdkafka - Node.js wrapper for Kafka C/C++ library
The MIT License (MIT)
Copyright (c) 2016 Blizzard Entertainment
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.

73
quickstart/go/README.md Normal file
Просмотреть файл

@ -0,0 +1,73 @@
# Send and Receive Messages in Go using Azure Event Hubs for Apache Kafka Ecosystem
This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer and consumer written in Go. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
This sample is based on [Confluent's Apache Kafka Golang client](https://github.com/confluentinc/confluent-kafka-go), modified for use with Event Hubs for Kafka.
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
In addition:
* [Go Tools Distribution](https://golang.org/doc/install)
* [Git](https://www.git-scm.com/downloads)
* On Ubuntu, you can run `sudo apt-get install git` to install Git.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
Additionally, topics in Kafka map to Event Hub instances, so create an Event Hub instance called "test" that our samples can send and receive messages from.
### FQDN
For this sample, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Clone the example project
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `quickstart/go` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/quickstart/go
```
## Configuration
Define two environmental variables that specify the fully qualified domain name and port of the Kafka head of your Event Hub and its connection string.
```bash
$ export KAFKA_EVENTHUB_ENDPOINT="{YOUR.EVENTHUBS.FQDN}:9093"
$ export KAFKA_EVENTHUB_CONNECTION_STRING="{YOUR.EVENTHUBS.CONNECTION.STRING}"
```
## Producer
The producer sample demonstrates how to send messages to the Event Hubs service using the Kafka head.
You can run the sample via:
```bash
$ cd producer
$ go run producer.go
```
The producer will now begin sending events to the Kafka-enabled Event Hub on topic `test` and printing the events to stdout. If you would like to change the topic, change the topic variable in `producer.go`.
## Consumer
The consumer sample demonstrates how to receive messages from the Event Hubs service using the Kafka head.
You can run the sample via:
```bash
$ cd consumer
$ go run consumer.go
```
The consumer will now begin receiving events from the Kafka-enabled Event Hub on topic `test` and printing the events to stdout. If you would like to change the topic, change the topic variable in `consumer.go`.

Просмотреть файл

@ -0,0 +1,46 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Copyright 2016 Confluent Inc.
//Licensed under the MIT License.
//Licensed under the Apache License, Version 2.0
//
//Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
package main
import (
"fmt"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
consumerGroup := "consumergroup"
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": os.Getenv("KAFKA_EVENTHUB_ENDPOINT"),
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": "$ConnectionString",
"sasl.password": os.Getenv("KAFKA_EVENTHUB_CONNECTION_STRING"),
"group.id": consumerGroup,
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
topics := []string{"test"}
c.SubscribeTopics(topics, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
break
}
}
c.Close()
}

Просмотреть файл

@ -0,0 +1,55 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Copyright 2016 Confluent Inc.
//Licensed under the MIT License.
//Licensed under the Apache License, Version 2.0
//
//Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
package main
import (
"fmt"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": os.Getenv("KAFKA_EVENTHUB_ENDPOINT"),
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": "$ConnectionString",
"sasl.password": os.Getenv("KAFKA_EVENTHUB_CONNECTION_STRING"),
})
if err != nil {
panic(err)
}
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "test"
for _, word := range []string{"Welcome", "to", "the", "Kafka", "head", "on", "Azure", "EventHubs"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries
p.Flush(60 * 1000)
}

96
quickstart/java/README.md Normal file
Просмотреть файл

@ -0,0 +1,96 @@
# Send and Receive Messages in Java using Azure Event Hubs for Apache Kafka Ecosystems
This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer and consumer written in Java. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
In addition:
* [Java Development Kit (JDK) 1.7+](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
* On Ubuntu, run `apt-get install default-jdk` to install the JDK.
* Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
* [Download](http://maven.apache.org/download.cgi) and [install](http://maven.apache.org/install.html) a Maven binary archive
* On Ubuntu, you can run `apt-get install maven` to install Maven.
* [Git](https://www.git-scm.com/downloads)
* On Ubuntu, you can run `sudo apt-get install git` to install Git.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
### FQDN
For this sample, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Clone the example project
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `quickstart/java` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/quickstart/java
```
## Producer
Using the provided producer example, send messages to the Event Hubs service. To change the Kafka version, change the dependency in the pom file to the desired version.
### Provide an Event Hubs Kafka endpoint
#### producer.config
Update the `bootstrap.servers` and `sasl.jaas.config` values in `producer/src/main/resources/producer.config` to direct the producer to the Event Hubs Kafka endpoint with the correct authentication.
```config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
```
### Run producer from command line
To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
```bash
mvn clean package
mvn exec:java -Dexec.mainClass="TestProducer"
```
The producer will now begin sending events to the Kafka-enabled Event Hub at topic `test` and printing the events to stdout. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/TestProducer.java`.
## Consumer
Using the provided consumer example, receive messages from the Kafka-enabled Event Hubs. To change the Kafka version, change the dependency in the pom file to the desired version.
### Provide an Event Hubs Kafka endpoint
#### consumer.config
Change the `bootstrap.servers` and `sasl.jaas.config` values in `consumer/src/main/resources/consumer.config` to direct the consumer to the Event Hubs endpoint with the correct authentication.
```config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
```
### Run consumer from command line
To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
```bash
mvn clean package
mvn exec:java -Dexec.mainClass="TestConsumer"
```
If the Kafka-enabled Event Hub has incoming events (for instance, if your example producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/TestConsumer.java`.
By default, Kafka consumers will read from the end of the stream rather than the beginning. This means any events queued before you begin running your consumer will not be read. If you started your consumer but it isn't receiving any events, try running your producer again while your consumer is polling. Alternatively, you can use Kafka's [`auto.offset.reset` consumer config](https://kafka.apache.org/documentation/#newconsumerconfigs) to make your consumer read from the beginning of the stream!

Просмотреть файл

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-java-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<!--v1.0-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<!--v1.1-->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency> -->
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,20 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestConsumer {
//Change constant to send messages to the desired topic
private final static String TOPIC = "test";
private final static int NUM_THREADS = 1;
public static void main(String... args) throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
for (int i = 0; i < NUM_THREADS; i++){
executorService.execute(new TestConsumerThread(TOPIC));
}
}
}

Просмотреть файл

@ -0,0 +1,72 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.FileReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
public class TestConsumerThread implements Runnable {
private final String TOPIC;
//Each consumer needs a unique client ID per thread
private static int id = 0;
public TestConsumerThread(final String TOPIC){
this.TOPIC = TOPIC;
}
public void run (){
final Consumer<Long, String> consumer = createConsumer();
System.out.println("Polling");
try {
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
for(ConsumerRecord<Long, String> cr : consumerRecords) {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", cr.key(), cr.value(), cr.partition(), cr.offset());
}
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.out.println("CommitFailedException: " + e);
} finally {
consumer.close();
}
}
private Consumer<Long, String> createConsumer() {
try {
final Properties properties = new Properties();
synchronized (TestConsumerThread.class) {
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "KafkaExampleConsumer#" + id);
id++;
}
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//Get remaining properties from config file
properties.load(new FileReader("src/main/resources/consumer.config"));
// Create the consumer using properties.
final Consumer<Long, String> consumer = new KafkaConsumer<>(properties);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
} catch (FileNotFoundException e){
System.out.println("FileNoteFoundException: " + e);
System.exit(1);
return null; //unreachable
} catch (IOException e){
System.out.println("IOException: " + e);
System.exit(1);
return null; //unreachable
}
}
}

Просмотреть файл

@ -0,0 +1,6 @@
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=$Default
request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Просмотреть файл

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-java-producer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<!--v1.0-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<!--v1.1-->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency> -->
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,39 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.sql.Timestamp;
public class TestDataReporter implements Runnable {
private static final int NUM_MESSAGES = 100;
private final String TOPIC;
private Producer<Long, String> producer;
public TestDataReporter(final Producer<Long, String> producer, String TOPIC) {
this.producer = producer;
this.TOPIC = TOPIC;
}
@Override
public void run() {
for(int i = 0; i < NUM_MESSAGES; i++) {
long time = System.currentTimeMillis();
System.out.println("Test Data #" + i + " from thread #" + Thread.currentThread().getId());
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data #" + i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println(exception);
System.exit(1);
}
}
});
}
System.out.println("Finished sending " + NUM_MESSAGES + " messages from thread #" + Thread.currentThread().getId() + "!");
}
}

Просмотреть файл

@ -0,0 +1,48 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.io.FileReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestProducer {
//Change constant to send messages to the desired topic, for this example we use 'test'
private final static String TOPIC = "test";
private final static int NUM_THREADS = 1;
public static void main(String... args) throws Exception {
//Create Kafka Producer
final Producer<Long, String> producer = createProducer();
final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
//Run NUM_THREADS TestDataReporters
for (int i = 0; i < NUM_THREADS; i++)
executorService.execute(new TestDataReporter(producer, TOPIC));
}
private static Producer<Long, String> createProducer() {
try{
Properties properties = new Properties();
properties.load(new FileReader("src/main/resources/producer.config"));
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(properties);
} catch (Exception e){
System.out.println("Failed to create producer with exception: " + e);
System.exit(0);
return null; //unreachable
}
}
}

Просмотреть файл

@ -0,0 +1,4 @@
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

94
quickstart/node/README.md Normal file
Просмотреть файл

@ -0,0 +1,94 @@
# Send and Receive Messages in Node using Event Hubs for Apache Kafka Ecosystems
This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer and consumer written in Node. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
This sample uses the [node-rdkafka](https://github.com/Blizzard/node-rdkafka) library. For instructions on how to configure for Windows, please visit the node-rdkafka project and follow their instructions before continuing with the sample. This sample is untested on Windows, but it has been tested on the [Linux Subsystem on Windows 10](https://docs.microsoft.com/windows/wsl/install-win10).
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
In addition:
- [Node.js](https://nodejs.org)
- [OpenSSL](https://github.com/openssl/openssl)
- On Mac OS, you can run `brew install openssl`
- On Ubuntu, `sudo apt-get install libssl-dev`
- [Git](https://www.git-scm.com/downloads)
- On Ubuntu, you can run `sudo apt-get install git` to install Git.
## Create an Event Hubs namespace
An Event Hub's namespace is required to send or receive from any Event Hubs service. See [Create Kafka Enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
Additionally, topics in Kafka map to Event Hub instances, so create an Event Hub instance called "test" that our samples can send and receive messages from.
### FQDN
For this sample, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Clone the example project
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `quickstart/node` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/quickstart/node
```
## Configuration
Configure the Kafka Producer/Consumer types with the fully qualified domain name and port of the Kafka head of your Event Hub and its connection string. For the consumer, you will also need the consumer group for that 'topic' (event hub); the default in Azure Event Hubs is `$Default`.
```javascript
var producer = new Kafka.Producer({
//'debug' : 'all',
'metadata.broker.list': '{YOUR.EVENTHUBS.FQDN}:9093',
'dr_cb': true, //delivery report callback
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '{YOUR.EVENTHUB.CONNECTION.STRING}'
});
```
This sample uses the [`node-rdkafka` module](https://github.com/Blizzard/node-rdkafka); to install the node module to your project, run this command:
Linux:
```bash
npm install node-rdkafka
```
Mac OS:
``` bash
CPPFLAGS=-I/usr/local/opt/openssl/include LDFLAGS=-L/usr/local/opt/openssl/lib npm install node-rdkafka
```
## Producer
The producer sample demonstrates how to send messages to the Event Hubs service using the Kafka head.
You can run the sample via:
```bash
$ node producer.js
```
The producer will now begin sending events to the Kafka enabled Event Hub on topic `test`. If you would like to change the topic, change the topic variable in `producer.js`.
## Consumer
The consumer sample demonstrates how to receive messages from the Event Hubs service using the Kafka head.
You can run the sample via:
```bash
$ node consumer.js
```
The consumer will now begin receiving events from the Kafka enabled Event Hub on topic `test`. If you would like to change the topic, change the topic variable in `consumer.js`. If the topic has not already been created in the Kafka configuration, no messages will be sent/received.

Просмотреть файл

@ -0,0 +1,42 @@
/* Copyright (c) Microsoft Corporation. All rights reserved.
* Copyright (c) 2016 Blizzard Entertainment
* Licensed under the MIT License.
*
* Original Blizzard node-rdkafka sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
*/
var Transform = require('stream').Transform;
var Kafka = require('node-rdkafka');
var stream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': '{YOUR.EVENTHUBS.FQDN}:9093',
'group.id': '$Default', //The default consumer group for EventHubs is $Default
'socket.keepalive.enable': true,
'enable.auto.commit': false,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '{YOUR.EVENTHUB.CONNECTION.STRING}'
}, {}, {
topics: 'test',
waitInterval: 0,
objectMode: false
});
stream.on('error', function (err) {
if (err) console.log(err);
process.exit(1);
});
stream
.pipe(process.stdout);
stream.on('error', function (err) {
console.log(err);
process.exit(1);
});
stream.consumer.on('event.error', function (err) {
console.log(err);
})

27
quickstart/node/package-lock.json сгенерированный Normal file
Просмотреть файл

@ -0,0 +1,27 @@
{
"name": "kafka_node",
"version": "1.0.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"bindings": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz",
"integrity": "sha512-DpLh5EzMR2kzvX1KIlVC0VkC3iZtHKTgdtZ0a3pglBZdaQFjt5S9g9xd1lE+YvXyfd6mtCeRnrUfOLYiTMlNSw=="
},
"nan": {
"version": "2.10.0",
"resolved": "https://registry.npmjs.org/nan/-/nan-2.10.0.tgz",
"integrity": "sha512-bAdJv7fBLhWC+/Bls0Oza+mvTaNQtP+1RyhhhvD95pgUJz6XM5IzgmxOkItJ9tkoCiplvAnXI1tNmmUD/eScyA=="
},
"node-rdkafka": {
"version": "2.3.3",
"resolved": "https://registry.npmjs.org/node-rdkafka/-/node-rdkafka-2.3.3.tgz",
"integrity": "sha512-2J54zC9+Zj0iRQttmQs1Ubv8aHhmh04XjP3vk39uco7l6tp8BYYHG4XRsoqKOGGKjBLctGpFHr9g97WBE1pTbg==",
"requires": {
"bindings": "1.3.0",
"nan": "2.10.0"
}
}
}
}

Просмотреть файл

@ -0,0 +1,15 @@
{
"name": "kafka_node",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "MIT",
"dependencies": {
"node-rdkafka": "^2.3.3"
}
}

Просмотреть файл

@ -0,0 +1,70 @@
/* Copyright (c) Microsoft Corporation. All rights reserved.
* Copyright (c) 2016 Blizzard Entertainment
* Licensed under the MIT License.
*
* Original Blizzard node-rdkafka sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
*/
var Kafka = require('node-rdkafka');
var producer = new Kafka.Producer({
//'debug' : 'all',
'metadata.broker.list': '{YOUR.EVENTHUBS.FQDN}:9093',
'dr_cb': true, //delivery report callback
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '{YOUR.EVENTHUB.CONNECTION.STRING}'
});
var topicName = 'test';
//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
console.log(log);
});
//logging all errors
producer.on('event.error', function(err) {
console.error('Error from producer');
console.error(err);
});
//counter to stop this sample after maxMessages are sent
var counter = 0;
var maxMessages = 10;
producer.on('delivery-report', function(err, report) {
console.log('delivery-report: ' + JSON.stringify(report));
counter++;
});
//Wait for the ready event before producing
producer.on('ready', function(arg) {
console.log('producer ready.' + JSON.stringify(arg));
for (var i = 0; i < maxMessages; i++) {
var value = new Buffer(`{"name" : "person${i}"}"`);
var key = "key-"+i;
// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
producer.produce(topicName, partition, value, key);
}
//need to keep polling for a while to ensure the delivery reports are received
var pollLoop = setInterval(function() {
producer.poll();
if (counter === maxMessages) {
clearInterval(pollLoop);
producer.disconnect();
}
}, 1000);
});
producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});
//starting the producer
producer.connect();

Просмотреть файл

@ -0,0 +1,85 @@
# Using Confluent's Python Kafka client and librdkafka with Event Hubs for Apache Kafka Ecosystems
This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer and consumer written in python. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
This sample is based on [Confluent's Apache Kafka Python client](https://github.com/confluentinc/confluent-kafka-python), modified for use with Event Hubs for Kafka.
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
In addition:
* [Git](https://www.git-scm.com/downloads)
* [Python](https://www.python.org/downloads/)
* [Pip](https://pypi.org/project/pip/)
* [OpenSSL](https://www.openssl.org/) (including libssl)
* [librdkafka](https://github.com/edenhill/librdkafka)
Running the setup script provided in this repo will install and configure all of the required dependencies.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
### FQDN
For this sample, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Set up
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `quickstart/python` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/quickstart/python
```
Now run the set up script:
```shell
source setup.sh
```
## Producer
### Update the configuration
Update `bootstrap.servers` and `sasl.password` in `producer.py` to direct the producer to the Event Hubs Kafka endpoint with the correct authentication.
### Run the producer from the command line
```shell
python producer.py <topic>
```
Note that the topic must already exist or else you will see an "Unknown topic or partition" error.
## Consumer
### Update the configuration
Update `bootstrap.servers` and `sasl.password` in `consumer.py` to direct the consumer to the Event Hubs Kafka endpoint with the correct authentication.
### Run the consumer from the command line
```shell
python consumer.py <your-consumer-group> <topic.1> <topic.2> ... <topic.n>
```
## Troubleshooting
This tutorial has a lot of dependencies, so we recommend using the provided setup script. If that doesn't work, here are some potential solutions:
* Run `$ sudo apt-get purge librdkafka1` and rerun the setup script.
* Clone Confluent's python Kafka library from Github and install that instead of the official pip package:
```shell
git clone https://github.com/confluentinc/confluent-kafka-python
pip uninstall confluent-kafka
pip install ./confluent-kafka-python/
```

Просмотреть файл

@ -0,0 +1,112 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, Version 2.0
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
import logging
from pprint import pformat
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <consumer-group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)
if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 2:
print_usage_and_exit(sys.argv[0])
group = argv[0]
topics = argv[1:]
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '{YOUR.EVENTHUBS.FQDN}:9093',
'security.protocol' : 'SASL_SSL',
'sasl.mechanism' : 'PLAIN',
'sasl.username' : '$ConnectionString',
'sasl.password' : '{YOUR.EVENTHUB.CONNECTION.STRING}',
'group.id': group,
'client.id': 'python-example-consumer',
'request.timeout.ms': 60000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)
if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
# Create Consumer instance
# Hint: try debug='fetch' to generate some log messages
c = Consumer(conf, logger=logger)
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
# Subscribe to topics
c.subscribe(topics, on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=100.0)
if msg is None:
continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
else:
# Error
raise KafkaException(msg.error())
else:
# Proper message
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()

Просмотреть файл

@ -0,0 +1,52 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, Version 2.0
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
from confluent_kafka import Producer
import sys
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
sys.exit(1)
topic = sys.argv[1]
#Producer configuration
#See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
#See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
conf = {
'bootstrap.servers': '{YOUR.EVENTHUBS.FQDN}:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '{YOUR.EVENTHUB.CONNECTION.STRING}',
'client.id': 'python-example-producer'
}
#Create Producer instance
p = Producer (**conf)
def delivery_callback (err, msg):
if err:
sys.stderr.write ('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write ('%% Message delivered to %s [%d] @ %o\n' % (msg.topic (), msg.partition (), msg.offset ()))
#Write 1-100 to topic
for i in range(0,100):
try:
p.produce(topic, str(i), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len (p))
p.poll (0)
#Wait until all messages have been delivered
sys.stderr.write ('%% Waiting for %d deliveries\n' % len (p))
p.flush ()

Просмотреть файл

@ -0,0 +1,15 @@
#!/bin/bash
echo "Downloading/installing necessary dependencies"
sudo apt-get install git openssl libssl-dev build-essential python-pip python-dev librdkafka-dev
git clone https://github.com/edenhill/librdkafka
echo "Setting up librdkafka"
cd librdkafka
./configure
make
sudo make install
cd ..
echo "Setting up Confluent's Python Kafka library"
sudo pip install confluent-kafka
echo "Try running the samples now!"

Просмотреть файл

@ -0,0 +1,117 @@
# Using Akka Streams with Event Hubs for Apache Kafka Ecosystems (Java)
This tutorial will show how to connect Akka Streams to Kafka-enabled Event Hubs without changing your protocol clients or running your own clusters. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
In addition:
* [Java Development Kit (JDK) 1.8+](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
* On Ubuntu, run `apt-get install default-jdk` to install the JDK.
* Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
* [Download](http://maven.apache.org/download.cgi) and [install](http://maven.apache.org/install.html) a Maven binary archive
* On Ubuntu, you can run `apt-get install maven` to install Maven.
* [Git](https://www.git-scm.com/downloads)
* On Ubuntu, you can run `sudo apt-get install git` to install Git.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
### FQDN
For this sample, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Clone the example project
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs repository and navigate to the `tutorials/akka/java` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java
```
## Akka Streams Producer
Using the provided Akka Streams producer example, send messages to the Event Hubs service.
### Provide an Event Hubs Kafka endpoint
#### Producer application.conf
Update the `bootstrap.servers` and `sasl.jaas.config` values in `producer/src/main/resources/application.conf` to direct the producer to the Event Hubs Kafka endpoint with the correct authentication. Note that Akka Streams automatically uses the `application.conf` file to load configurations, so changing its name or location will break this tutorial unless other steps to load configurations are taken.
```config
akka.kafka.producer {
#Akka Kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
```
### Run producer from the command line
To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
```bash
mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"
```
The producer will now begin sending events to the Kafka-enabled Event Hub at topic `test` and printing the events to stdout. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/AkkaTestProducer.java`.
## Akka Streams Consumer
Using the provided consumer example, receive messages from the Kafka-enabled Event Hubs.
### Provide an Event Hubs Kafka endpoint
#### Consumer application.conf
Update the `bootstrap.servers` and `sasl.jaas.config` values in `consumer/src/main/resources/application.conf` to direct the consumer to the Event Hubs Kafka endpoint with the correct authentication. Note that Akka Streams automatically uses the `application.conf` file to load configurations, so changing its name or location will break this tutorial unless other steps to load configurations are taken.
```config
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
```
### Run consumer from the command line
To run the consumer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
```bash
mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"
```
If the Kafka-enabled Event Hub has events (for instance, if your producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/AkkaTestConsumer.java`.
Check out the [Akka Streams Kafka Guide](https://doc.akka.io/docs/akka-stream-kafka/current/home.html) for more detailed information on Akka Streams.

Просмотреть файл

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-akka-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.12</artifactId>
<version>0.20</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,63 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import akka.Done;
import akka.actor.ActorSystem;
import akka.kafka.*;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
abstract class AbstractConsumer {
protected final ActorSystem system = ActorSystem.create("example");
protected final Materializer materializer = ActorMaterializer.create(system);
// Consumer settings
protected ConsumerSettings<byte[], String> consumerSettings = ConsumerSettings
.create(system, new ByteArrayDeserializer(), new StringDeserializer());
// DB
static class DB {
private final AtomicLong offset = new AtomicLong();
public CompletionStage<Done> save(ConsumerRecord<byte[], String> record) {
System.out.println("DB.save: " + record.value());
offset.set(record.offset());
return CompletableFuture.completedFuture(Done.getInstance());
}
public CompletionStage<Long> loadOffset() {
return CompletableFuture.completedFuture(offset.get());
}
public CompletionStage<Done> update(String data) {
System.out.println(data);
return CompletableFuture.completedFuture(Done.getInstance());
}
}
}
public class AkkaTestConsumer extends AbstractConsumer {
private final static String TOPIC = "test";
public static void main(String[] args) {
new AkkaTestConsumer().demo();
}
//Consumes each message from TOPIC at least once
public void demo() {
final DB db = new DB();
akka.kafka.javadsl.Consumer.committableSource(consumerSettings, Subscriptions.topics(TOPIC))
.mapAsync(1, msg -> db.update(msg.record().value()).thenApply(done -> msg))
.mapAsync(1, msg -> msg.committableOffset().commitJavadsl())
.runWith(Sink.foreach(p -> System.out.println(p)), materializer);
}
}

Просмотреть файл

@ -0,0 +1,16 @@
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}

Просмотреть файл

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-akka-producer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.12</artifactId>
<version>0.20</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,61 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import akka.Done;
import akka.actor.ActorSystem;
import akka.kafka.ProducerSettings;
import akka.kafka.javadsl.Producer;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import akka.stream.StreamLimitReachedException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.CompletionStage;
abstract class AbstractProducer {
protected final ActorSystem system = ActorSystem.create("example");
protected final Materializer materializer = ActorMaterializer.create(system);
protected final ProducerSettings<byte[], String> producerSettings = ProducerSettings
.create(system, new ByteArraySerializer(), new StringSerializer());
protected final KafkaProducer<byte[], String> kafkaProducer = producerSettings.createKafkaProducer();
protected final static int RANGE = 100;
protected void terminateWhenDone(CompletionStage<Done> result) {
result.exceptionally(e -> {
if (e instanceof StreamLimitReachedException){
System.out.println("Sent " + RANGE + " messages!");
system.terminate();
}
else
system.log().error(e, e.getMessage());
return Done.getInstance();
})
.thenAccept(d -> system.terminate());
}
}
public class AkkaTestProducer extends AbstractProducer {
private static final String TOPIC = "test";
public static void main(String[] args) {
new AkkaTestProducer().demo();
}
public void demo() {
System.out.println("Sending");
// Sends integer 1-100 to Kafka topic "test"
CompletionStage<Done> done = Source
.range(1, RANGE)
.limit(RANGE - 1)
.map(n -> n.toString()).map(elem -> new ProducerRecord<byte[], String>(TOPIC, elem))
.runWith(Producer.plainSink(producerSettings, kafkaProducer), materializer);
terminateWhenDone(done);
}
}

Просмотреть файл

@ -0,0 +1,13 @@
akka.kafka.producer {
#Akka kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}

Просмотреть файл

@ -0,0 +1,115 @@
# Using Akka Streams with Event Hubs for Apache Kafka Ecosystems (Scala)
This tutorial will show how to connect Akka Streams to Kafka-enabled Event Hubs without changing your protocol clients or running your own clusters. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
In addition:
* [Java Development Kit (JDK) 1.8+](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
* On Ubuntu, run `apt-get install default-jdk` to install the JDK.
* Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
* [sbt](https://www.scala-sbt.org/download.html)
* On Ubuntu, follow the directions at [https://www.scala-sbt.org/release/docs/Installing-sbt-on-Linux.html](https://www.scala-sbt.org/release/docs/Installing-sbt-on-Linux.html)
* [Git](https://www.git-scm.com/downloads)
* On Ubuntu, you can run `sudo apt-get install git` to install Git.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
### FQDN
For this sample, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Clone the example project
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs repository and navigate to the `tutorials/akka/scala` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/scala
```
## Akka Streams Producer
Using the provided Akka Streams producer example, send messages to the Event Hubs service.
### Provide an Event Hubs Kafka endpoint
#### Producer application.conf
Update the `bootstrap.servers` and `sasl.jaas.config` values in `producer/resources/application.conf` to direct the producer to the Event Hubs Kafka endpoint with the correct authentication. Note that Akka Streams automatically uses the `application.conf` file to load configurations, so changing it's name or location will break this tutorial unless other steps to load configurations are taken.
```config
akka.kafka.producer {
#Akka Kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
```
### Run producer from the command line
To run the producer from the command line:
```bash
sbt run
```
The producer will now begin sending events to the Kafka-enabled Event Hub at topic `test` and printing the message it sent to stdout. If you would like to change the topic, change the TOPIC constant in `producer/scala/ProducerMain.scala`.
## Akka Streams Consumer
Using the provided consumer example, receive messages from the Kafka-enabled Event Hubs.
### Provide an Event Hubs Kafka endpoint
#### Consumer application.conf
Update the `bootstrap.servers` and `sasl.jaas.config` values in `consumer/resources/application.conf` to direct the consumer to the Event Hubs Kafka endpoint with the correct authentication. Note that Akka Streams automatically uses the `application.conf` file to load configurations, so changing it's name or location will break this tutorial unless other steps to load configurations are taken.
```config
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
```
### Run consumer from the command line
To run the consumer from the command line:
```bash
sbt run
```
If the Kafka-enabled Event Hub has events (for instance, if your producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `consumer/scala/ConsumerMain.scala`.
Check out the [Akka Streams Kafka Guide](https://doc.akka.io/docs/akka-stream-kafka/current/home.html) for more detailed information on Akka Streams.

Просмотреть файл

@ -0,0 +1,7 @@
name := "scala-event-hubs-consumer"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

Просмотреть файл

@ -0,0 +1,16 @@
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}

Просмотреть файл

@ -0,0 +1,33 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.Future
import scala.language.postfixOps
object ConsumerMain {
def main(args: Array[String]): Unit = {
implicit val system:ActorSystem = ActorSystem.apply("akka-stream-kafka")
implicit val materializer:ActorMaterializer = ActorMaterializer()
// grab our settings from the resources/application.conf file
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
// our topic to subscribe to for messages
val topic = "test"
// listen to our topic with our settings, until the program is exited
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) ( msg => {
// print out our message once it's received
println(s"Message Received : ${msg.timestamp} - ${msg.value}")
Future.successful(msg)
}).runWith(Sink.ignore)
}
}

Просмотреть файл

@ -0,0 +1,7 @@
name := "scala-event-hubs-producer"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

Просмотреть файл

@ -0,0 +1,13 @@
akka.kafka.producer {
#Akka kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}

Просмотреть файл

@ -0,0 +1,43 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import java.util.concurrent.TimeUnit.SECONDS
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps
object ProducerMain {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem.apply("akka-stream-kafka")
implicit val materializer: ActorMaterializer = ActorMaterializer()
// grab our settings from the resources/application.conf file
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
// topic to send the message to on event hubs
val topic = "sampleTopic"
// loop until the program reaches 100
Source(1 to 100)
.throttle(1, FiniteDuration(1, SECONDS), 1, ThrottleMode.Shaping)
.map(num => {
//construct our message here
val message = s"Akka Scala Producer Message # ${num}"
println(s"Message sent to topic - $topic - $message")
new ProducerRecord[Array[Byte], String](topic, message.getBytes, message.toString)
})
.runWith(Producer.plainSink(producerSettings))
.onComplete(_ => {
println("All messages sent!")
system.terminate()
})(scala.concurrent.ExecutionContext.global)
}
}

102
tutorials/flink/README.md Normal file
Просмотреть файл

@ -0,0 +1,102 @@
# Using Apache Flink with Event Hubs for Apache Kafka Ecosystems
This tutorial will show how to connect Apache Flink to Kafka-enabled Event Hubs without changing your protocol clients or running your own clusters. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/en-us/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
In addition:
* [Java Development Kit (JDK) 1.7+](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
* On Ubuntu, run `apt-get install default-jdk` to install the JDK.
* Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
* [Download](http://maven.apache.org/download.cgi) and [install](http://maven.apache.org/install.html) a Maven binary archive
* On Ubuntu, you can run `apt-get install maven` to install Maven.
* [Git](https://www.git-scm.com/downloads)
* On Ubuntu, you can run `sudo apt-get install git` to install Git.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
### FQDN
For this sample, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Clone the example project
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `tutorials/flink` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink
```
## Flink Producer
Using the provided Flink producer example, send messages to the Event Hubs service.
### Provide an Event Hubs Kafka endpoint
#### producer.config
Update the `bootstrap.servers` and `sasl.jaas.config` values in `producer/src/main/resources/producer.config` to direct the producer to the Event Hubs Kafka endpoint with the correct authentication.
```
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
```
### Run producer from the command line
To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
```bash
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"
```
The producer will now begin sending events to the Kafka-enabled Event Hub at topic `test` and printing the events to stdout. If you would like to change the topic, change the TOPIC constant in `producer/src/main/java/com/example/app/FlinkTestProducer.java`.
## Flink Consumer
Using the provided consumer example, receive messages from the Kafka-enabled Event Hubs.
### Provide an Event Hubs Kafka endpoint
#### consumer.config
Update the `bootstrap.servers` and `sasl.jaas.config` values in `consumer/src/main/resources/consumer.config` to direct the consumer to the Event Hubs Kafka endpoint with the correct authentication.
```config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
```
### Run consumer from the command line
To run the consumer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
```bash
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"
```
If the Kafka-enabled Event Hub has events (for instance, if your producer is also running), then the consumer should now begin receiving events from topic `test`. If you would like to change the topic, change the TOPIC constant in `consumer/src/main/java/com/example/app/FlinkTestConsumer.java`.
Check out [Flink's Kafka Connector Guide](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html) for more detailed information on connecting Flink to Kafka.

Просмотреть файл

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-flink-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.2</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,33 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; //Kafka v0.11.0.0
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
public class FlinkTestConsumer {
private static final String TOPIC = "test";
private static final String FILE_PATH = "src/main/resources/consumer.config";
public static void main(String... args) {
try {
//Load properties from config file
Properties properties = new Properties();
properties.load(new FileReader(FILE_PATH));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream = env.addSource(new FlinkKafkaConsumer011(TOPIC, new SimpleStringSchema(), properties));
stream.print();
env.execute("Testing flink consumer");
} catch(FileNotFoundException e){
System.out.println("FileNoteFoundException: " + e);
} catch (Exception e){
System.out.println("Failed with exception " + e);
}
}
}

Просмотреть файл

@ -0,0 +1,7 @@
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
request.timeout.ms=60000
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Просмотреть файл

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-flink-producer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.2</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,48 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; //v0.11.0.0
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
public class FlinkTestProducer {
private static final String TOPIC = "test";
private static final String FILE_PATH = "src/main/resources/producer.config";
public static void main(String... args) {
try {
Properties properties = new Properties();
properties.load(new FileReader(FILE_PATH));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream = createStream(env);
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(
TOPIC,
new SimpleStringSchema(), // serialization schema
properties);
stream.addSink(myProducer);
env.execute("Testing flink print");
} catch(FileNotFoundException e){
System.out.println("FileNotFoundException: " + e);
} catch (Exception e) {
System.out.println("Failed with exception:: " + e);
}
}
public static DataStream createStream(StreamExecutionEnvironment env){
return env.generateSequence(0, 200)
.map(new MapFunction<Long, String>() {
@Override
public String map(Long in) {
return "FLINK PRODUCE " + in;
}
});
}
}

Просмотреть файл

@ -0,0 +1,6 @@
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Просмотреть файл

@ -0,0 +1,94 @@
# Using Apache Kafka's MirrorMaker with Event Hubs for Apache Kafka Ecosystems
![MirrorMaker Diagram](./mirror-maker-graphic.PNG)
One major consideration for modern cloud scale apps is being able to update, improve, and change infrastructure without interrupting service. In this tutorial, we show how a Kafka-enabled Event Hub and Kafka MirrorMaker can integrate an existing Kafka pipeline into Azure by "mirroring" the Kafka input stream in the Event Hub service.
## Prerequisites
To complete this tutorial, make sure you have:
* An Azure subscription. If you do not have one, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
* [Java Development Kit (JDK) 1.7+](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
* On Ubuntu, run `apt-get install default-jdk` to install the JDK.
* Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
* [Download](http://maven.apache.org/download.cgi) and [install](http://maven.apache.org/install.html) a Maven binary archive
* On Ubuntu, you can run `apt-get install maven` to install Maven.
* [Git](https://www.git-scm.com/downloads)
* On Ubuntu, you can run `sudo apt-get install git` to install Git.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create a Kafka-enabled Event Hub](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
### FQDN
For this sample, you will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. **The FQDN can be found within your connection string as follows**:
```
Endpoint=sb://{YOUR.EVENTHUBS.FQDN}/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}
```
## Clone the example project
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `tutorials/mirror-maker` subfolder:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/mirror-maker
```
## Set up a Kafka cluster
Use the [Kafka quickstart guide](https://kafka.apache.org/quickstart) to set up a cluster with the desired settings (or use an existing Kafka cluster).
## Kafka MirrorMaker
Kafka MirrorMaker allows for the "mirroring" of a stream. Given source and destination Kafka clusters, MirrorMaker will ensure any messages sent to the source cluster will be received by both the source *and* destination clusters. In this example, we'll show how to mirror a source Kafka cluster with a destination Kafka-enabled Event Hub. This scenario can be used to send data from an existing Kafka pipeline to Event Hubs without interrupting the flow of data.
Check out the [Kafka Mirroring/MirrorMaker Guide](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330) for more detailed information on Kafka MirrorMaker.
### Configuration
To configure Kafka MirrorMaker, we'll give it a Kafka cluster as its consumer/source and a Kafka-enabled Event Hub as its producer/destination.
#### Consumer Configuration
Update the consumer configuration file `source-kafka.config`, which tells MirrorMaker the properties of the source Kafka cluster.
##### source-kafka.config
```config
bootstrap.servers={SOURCE.KAFKA.IP.ADDRESS1}:{SOURCE.KAFKA.PORT1},{SOURCE.KAFKA.IP.ADDRESS2}:{SOURCE.KAFKA.PORT2},etc
group.id=example-mirrormaker-group
exclude.internal.topics=true
client.id=mirror_maker_consumer
```
#### Producer Configuration
Now update the producer config file `mirror-eventhub.config`, which tells MirrorMaker to send the duplicated (or "mirrored") data to the Event Hubs service. Specifically change `bootstrap.servers` and `sasl.jaas.config` to point to your Event Hubs Kafka endpoint. The Event Hubs service requires secure (SASL) communication, which is achieved by setting the last three properties in the configuration below.
##### mirror-eventhub.config
```config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=mirror_maker_producer
#Required for Event Hubs
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
```
### Run MirrorMaker
Run the Kafka MirrorMaker script from the root Kafka directory using the newly updated configuration files. Make sure to update the path of the config files (or copy them to the root Kafka directory) in the following command.
```bash
bin/kafka-mirror-maker.sh --consumer.config source-kafka.config --num.streams 1 --producer.config mirror-eventhub.config --whitelist=".*"
```
To verify that events are making it to the Kafka-enabled Event Hub, check out the ingress statistics in the [Azure portal](https://azure.microsoft.com/features/azure-portal/), or run a consumer against the Event Hub.
Now that MirrorMaker is running, any events sent to the source Kafka cluster should be received by both the Kafka cluster *and* the mirrored Kafka-enabled Event Hub service. By using MirrorMaker and an Event Hubs Kafka endpoint, we can migrate an existing Kafka pipeline to the managed Azure Event Hubs service without changing the existing cluster or interrupting any ongoing data flow!

Просмотреть файл

@ -0,0 +1,8 @@
# Event Hubs Kafka endpoint
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=mirror_maker_producer
# Event Hubs requires secure communication
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Двоичные данные
tutorials/mirror-maker/mirror-maker-graphic.PNG Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 40 KiB

Просмотреть файл

@ -0,0 +1,5 @@
#Simple Kafka set up
bootstrap.servers={SOURCE.KAFKA.IP.ADDRESS1}:{SOURCE.KAFKA.PORT1},{SOURCE.KAFKA.IP.ADDRESS2}:{SOURCE.KAFKA.PORT2},etc
client.id=mirror_maker_consumer
group.id=example-mirrormaker-group
exclude.internal.topics=true