Adding samples for documentation (#318)

* Added Kafka Output Functions

* Added samples for Javascript runtime

* Kafka samples fortypescript, powershell , java and python

* Kafka samples for dotnet

* Samples for dotnet-isolated

* Updated headers samples for typescript

* Modified Headers samples

* Modified all trigger functions with base64 decoded value

* Added function.json for eventhubs

* Added eventhub samples and updated confluent samples for java

* Added eventhub samples for dotnet and dotnet-isolated

* Updated local.settings.json and renamed to function.confluent.json

* Removed dotnet-isolated's headers functions

* Removed string and added KafkaEntity object in Kafka output binding for java runtime

* Removed post attribute for http trigger from function.json

* Added print statements in powershell kafka trigger functions

* Changed datatype from binary to string for Javascript samples

* Corrected header samples for java

* Fixed samples with wrong configuration

* Reverting changes in pom.xml and updating libraries to updated version

* Added enitity folder

* Correcting extension verion

* Added Headers section to readme

* Added note related to username and password for eventhub

* Updating function names in Readmes

* Updating readme

Co-authored-by: Shivam Rohilla <80326489+shrohilla@users.noreply.github.com>
This commit is contained in:
krishna-kariya 2022-04-29 15:59:32 +05:30 коммит произвёл GitHub
Родитель 9b1d14d267
Коммит ba604e6f14
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
206 изменённых файлов: 6558 добавлений и 645 удалений

Просмотреть файл

@ -51,6 +51,8 @@ You can find all Kafka related configuration on the `function.json.` In the case
_function.json_
#### For Confluent
```json
{
"scriptFile" : "../kafka-function-1.0-SNAPSHOT.jar",
@ -72,12 +74,37 @@ _function.json_
}
```
#### For EventHub
```json
{
"scriptFile" : "../kafka-function-1.0-SNAPSHOT.jar",
"entryPoint" : "com.contoso.kafka.TriggerFunction.runMany",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvents",
"password" : "%EventHubConnectionString%",
"protocol" : "SASLSSL",
"dataType" : "string",
"topic" : "message",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"cardinality" : "MANY",
"username" : "$ConnectionString",
"brokerList" : "%BrokerList%"
} ]
}
```
**NOTE** For EventHub, username should be set to "$ConnectionString" only. The password should be the actual connection string value that could be set in local.settings.json or appsettings (Please see [local-settings](#localsettingsjson) section for more details).
### local.settings.json
It is the configuration of a local function runtime. If you deploy the target application on Azure with a `local.settings.json,` you will require the same settings on the Function App [App settings](https://docs.microsoft.com/en-us/azure/azure-functions/functions-how-to-use-azure-function-app-settings#settings).
For more details, refer to [Local settings file](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local?tabs=macos%2Ccsharp%2Cbash#local-settings-file).
**NOTE** All the passwords and connection strings settings are recommended to be put in appsettings. For more details, refer to [Local settings file](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local?tabs=v4%2Cmacos%2Ccsharp%2Cportal%2Cbash#local-settings).
#### For Confluent
```json
{
"IsEncrypted": false,
@ -85,12 +112,26 @@ For more details, refer to [Local settings file](https://docs.microsoft.com/en-u
"BrokerList": "{YOUR_CONFLUENT_CLOUD_BROKER}",
"ConfluentCloudUserName": "{YOUR_CONFLUENT_CLOUD_USERNAME}",
"ConfluentCloudPassword": "{YOUR_CONFLUENT_CLOUD_PASSWORD}",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": ""
"FUNCTIONS_WORKER_RUNTIME": "<runtime>",
"AzureWebJobsStorage": "",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}
```
#### For EventHub
```json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "<runtime>",
"BrokerList": "<YOUR_EVENTHUB_NAMESPACE_NAME>.servicebus.windows.net:9093",
"EventHubConnectionString": "<YOUR_EVENTHUB_CONNECTIONSTRING>",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}
```
### Extension Bundle and install Kafka extension
Currently, in Azure Functions - most triggers and bindings are ordinarily obtained using the extension bundle. However, currently, the Kafka extension is not part of the extension bundle (will be added in the future). Meanwhile, you will have to install the Kafka extension manually.
@ -182,3 +223,19 @@ The sample provides a devcontainer profile. Open the folder in VsCode and perfor
}
}
```
### Headers
Headers are supported for both Kafka Trigger and Kafka Output binding. You can find the samples for headers in this folder with name `KafkaTriggerWithHeaders`, `KafkaTriggerManyWithHeaders` for Trigger functions and `KafakOutputWithHeaders`, `KafkaOutputManyWithHeaders` for output binding functions.
#### Output Binding Functions
`KafkaOutputWithHeaders` is a sample for single event type while `KafkaOutputManyWithHeaders` is for batch events.
To run `KafkaOutputWithHeaders` function, send a http GET request with message at url `http://localhost:7071/api/KafkaOutputWithHeaders?message=<your_message>`. It will create a new Kafka Event with payload as your_message and headers as `{ Key: 'test', Value: '<language>'}`.
Similarly, to run `KafkaOutputManyWithHeaders` function, send a http GET request at url `http://localhost:7071/api/KafkaOutputManyWithHeaders`. It would create two messages with headers on given topic.
#### Trigger Functions
`KafkaTriggerWithHeaders` is a sample for single event type while `KafkaTriggerManyWithHeaders` is for batch events.
`KafkaTriggerWithHeaders` will be triggered whenever there is a Kafka Event. It prints the message and the corresponding headers for that message.
Similarly, `KafkaTriggerManyWithHeaders` is a trigger function which processes batch of Kafka events. For all the events in the batch, it prints the message and corresponding headers.

264
samples/dotnet-isolated/confluent/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,264 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# Azure Functions localsettings file
local.settings.json
# User-specific files
*.suo
*.user
*.userosscache
*.sln.docstates
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/
# Visual Studio 2015 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# NUNIT
*.VisualState.xml
TestResult.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
# DNX
project.lock.json
project.fragment.lock.json
artifacts/
*_i.c
*_p.c
*_i.h
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*.log
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opendb
*.opensdf
*.sdf
*.cachefile
*.VC.db
*.VC.VC.opendb
# Visual Studio profiler
*.psess
*.vsp
*.vspx
*.sap
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# JustCode is a .NET coding add-in
.JustCode
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# NCrunch
_NCrunch_*
.*crunch*.local.xml
nCrunchTemp_*
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# TODO: Comment the next line if you want to checkin your web deploy settings
# but database connection strings (with potential passwords) will be unencrypted
#*.pubxml
*.publishproj
# Microsoft Azure Web App publish settings. Comment the next line if you want to
# checkin your Azure Web App publish settings, but sensitive information contained
# in these scripts will be unencrypted
PublishScripts/
# NuGet Packages
*.nupkg
# The packages folder can be ignored because of Package Restore
**/packages/*
# except build/, which is used as an MSBuild target.
!**/packages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/packages/repositories.config
# NuGet v3's project.json files produces more ignoreable files
*.nuget.props
*.nuget.targets
# Microsoft Azure Build Output
csx/
*.build.csdef
# Microsoft Azure Emulator
ecf/
rcf/
# Windows Store app package directories and files
AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
!*.[Cc]ache/
# Others
ClientBin/
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.jfm
*.pfx
*.publishsettings
node_modules/
orleans.codegen.cs
# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
# SQL Server files
*.mdf
*.ldf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
# Microsoft Fakes
FakesAssemblies/
# GhostDoc plugin setting file
*.GhostDoc.xml
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
_Pvt_Extensions
# Paket dependency manager
.paket/paket.exe
paket-files/
# FAKE - F# Make
.fake/
# JetBrains Rider
.idea/
*.sln.iml
# CodeRush
.cr/
# Python Tools for Visual Studio (PTVS)
__pycache__/
*.pyc

Просмотреть файл

@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<OutputType>Exe</OutputType>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.3.0" OutputItemType="Analyzer"/>
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.5.2" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Kafka" Version="3.3.2" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.0.13" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="5.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,47 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Http;
using System.Net;
namespace Confluent
{
public class KafkaOutput
{
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
}
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
}

Просмотреть файл

@ -0,0 +1,46 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Http;
using System.Net;
namespace Confluent
{
public class KafkaOutputMany
{
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
}
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
}

Просмотреть файл

@ -0,0 +1,26 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Http;
using Newtonsoft.Json.Linq;
namespace Confluent
{
public class KafkaTrigger
{
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
}
}

Просмотреть файл

@ -0,0 +1,30 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Http;
using Newtonsoft.Json.Linq;
namespace Confluent
{
public class KafkaTriggerMany
{
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Azure.Functions.Worker.Configuration;
namespace Confluent
{
public class Program
{
public static void Main()
{
var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.Build();
host.Run();
}
}
}

Просмотреть файл

@ -0,0 +1,16 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensions": {
"kafka": {
"maxBatchSize": 3
}
}
}

Просмотреть файл

@ -0,0 +1,11 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"BrokerList": "{YOUR_CONFLUENT_CLOUD_BROKER}",
"ConfluentCloudUserName": "{YOUR_CONFLUENT_CLOUD_USERNAME}",
"ConfluentCloudPassword": "{YOUR_CONFLUENT_CLOUD_PASSWORD}",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}

264
samples/dotnet-isolated/eventhub/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,264 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# Azure Functions localsettings file
local.settings.json
# User-specific files
*.suo
*.user
*.userosscache
*.sln.docstates
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/
# Visual Studio 2015 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# NUNIT
*.VisualState.xml
TestResult.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
# DNX
project.lock.json
project.fragment.lock.json
artifacts/
*_i.c
*_p.c
*_i.h
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*.log
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opendb
*.opensdf
*.sdf
*.cachefile
*.VC.db
*.VC.VC.opendb
# Visual Studio profiler
*.psess
*.vsp
*.vspx
*.sap
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# JustCode is a .NET coding add-in
.JustCode
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# NCrunch
_NCrunch_*
.*crunch*.local.xml
nCrunchTemp_*
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# TODO: Comment the next line if you want to checkin your web deploy settings
# but database connection strings (with potential passwords) will be unencrypted
#*.pubxml
*.publishproj
# Microsoft Azure Web App publish settings. Comment the next line if you want to
# checkin your Azure Web App publish settings, but sensitive information contained
# in these scripts will be unencrypted
PublishScripts/
# NuGet Packages
*.nupkg
# The packages folder can be ignored because of Package Restore
**/packages/*
# except build/, which is used as an MSBuild target.
!**/packages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/packages/repositories.config
# NuGet v3's project.json files produces more ignoreable files
*.nuget.props
*.nuget.targets
# Microsoft Azure Build Output
csx/
*.build.csdef
# Microsoft Azure Emulator
ecf/
rcf/
# Windows Store app package directories and files
AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
!*.[Cc]ache/
# Others
ClientBin/
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.jfm
*.pfx
*.publishsettings
node_modules/
orleans.codegen.cs
# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
# SQL Server files
*.mdf
*.ldf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
# Microsoft Fakes
FakesAssemblies/
# GhostDoc plugin setting file
*.GhostDoc.xml
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
_Pvt_Extensions
# Paket dependency manager
.paket/paket.exe
paket-files/
# FAKE - F# Make
.fake/
# JetBrains Rider
.idea/
*.sln.iml
# CodeRush
.cr/
# Python Tools for Visual Studio (PTVS)
__pycache__/
*.pyc

Просмотреть файл

@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<OutputType>Exe</OutputType>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.3.0" OutputItemType="Analyzer"/>
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.5.2" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Kafka" Version="3.3.2" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.0.13" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="5.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,47 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Http;
using System.Net;
namespace Eventhub
{
public class KafkaOutput
{
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
}
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
}

Просмотреть файл

@ -0,0 +1,46 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Http;
using System.Net;
namespace Eventhub
{
public class KafkaOutputMany
{
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
}
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
}

Просмотреть файл

@ -0,0 +1,26 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Http;
using Newtonsoft.Json.Linq;
namespace Eventhub
{
public class KafkaTrigger
{
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
}
}

Просмотреть файл

@ -0,0 +1,30 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Http;
using Newtonsoft.Json.Linq;
namespace Eventhub
{
public class KafkaTriggerMany
{
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Azure.Functions.Worker.Configuration;
namespace Eventhub
{
public class Program
{
public static void Main()
{
var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.Build();
host.Run();
}
}
}

Просмотреть файл

@ -0,0 +1,16 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensions": {
"kafka": {
"maxBatchSize": 3
}
}
}

Просмотреть файл

@ -0,0 +1,10 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"BrokerList": "<YOUR_EVENTHUB_NAMESPACE_NAME>.servicebus.windows.net:9093",
"EventHubConnectionString": "<YOUR_EVENTHUB_CONNECTIONSTRING>",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}

264
samples/dotnet/Confluent/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,264 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# Azure Functions localsettings file
local.settings.json
# User-specific files
*.suo
*.user
*.userosscache
*.sln.docstates
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/
# Visual Studio 2015 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# NUNIT
*.VisualState.xml
TestResult.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
# DNX
project.lock.json
project.fragment.lock.json
artifacts/
*_i.c
*_p.c
*_i.h
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*.log
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opendb
*.opensdf
*.sdf
*.cachefile
*.VC.db
*.VC.VC.opendb
# Visual Studio profiler
*.psess
*.vsp
*.vspx
*.sap
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# JustCode is a .NET coding add-in
.JustCode
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# NCrunch
_NCrunch_*
.*crunch*.local.xml
nCrunchTemp_*
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# TODO: Comment the next line if you want to checkin your web deploy settings
# but database connection strings (with potential passwords) will be unencrypted
#*.pubxml
*.publishproj
# Microsoft Azure Web App publish settings. Comment the next line if you want to
# checkin your Azure Web App publish settings, but sensitive information contained
# in these scripts will be unencrypted
PublishScripts/
# NuGet Packages
*.nupkg
# The packages folder can be ignored because of Package Restore
**/packages/*
# except build/, which is used as an MSBuild target.
!**/packages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/packages/repositories.config
# NuGet v3's project.json files produces more ignoreable files
*.nuget.props
*.nuget.targets
# Microsoft Azure Build Output
csx/
*.build.csdef
# Microsoft Azure Emulator
ecf/
rcf/
# Windows Store app package directories and files
AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
!*.[Cc]ache/
# Others
ClientBin/
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.jfm
*.pfx
*.publishsettings
node_modules/
orleans.codegen.cs
# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
# SQL Server files
*.mdf
*.ldf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
# Microsoft Fakes
FakesAssemblies/
# GhostDoc plugin setting file
*.GhostDoc.xml
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
_Pvt_Extensions
# Paket dependency manager
.paket/paket.exe
paket-files/
# FAKE - F# Make
.fake/
# JetBrains Rider
.idea/
*.sln.iml
# CodeRush
.cr/
# Python Tools for Visual Studio (PTVS)
__pycache__/
*.pyc

Просмотреть файл

@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.3.3" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.4" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,34 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace Confluent
{
public class KafkaOutput
{
[FunctionName("KafkaOutput")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out string eventData,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.Query["message"];
string responseMessage = "Ok";
eventData = message;
return new OkObjectResult(responseMessage);
}
}
}

Просмотреть файл

@ -0,0 +1,31 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace Confluent
{
public class KafkaOutputMany
{
[FunctionName("KafkaOutputMany")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string>[] eventDataArr,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
eventDataArr = new KafkaEventData<string>[2];
eventDataArr[0] = new KafkaEventData<string>("one");
eventDataArr[1] = new KafkaEventData<string>("two");
return new OkObjectResult("Ok");
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace Confluent
{
public class KafkaOutputManyWithHeaders
{
[FunctionName("KafkaOutputManyWithHeaders")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string>[] eventDataArr,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
eventDataArr = new KafkaEventData<string>[2];
eventDataArr[0] = new KafkaEventData<string>("one");
eventDataArr[0].Headers.Add("test", System.Text.Encoding.UTF8.GetBytes("dotnet"));
eventDataArr[1] = new KafkaEventData<string>("two");
eventDataArr[1].Headers.Add("test1", System.Text.Encoding.UTF8.GetBytes("dotnet"));
return new OkObjectResult("Ok");
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace Confluent
{
public class KafkaOutputWithHeaders
{
[FunctionName("KafkaOutputWithHeaders")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string> eventData,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.Query["message"];
eventData = new KafkaEventData<string>(message);
eventData.Headers.Add("test", System.Text.Encoding.UTF8.GetBytes("dotnet"));
return new OkObjectResult("Ok");
}
}
}

Просмотреть файл

@ -0,0 +1,23 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Extensions.Logging;
namespace Confluent
{
public class KafkaTrigger
{
[FunctionName("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string> kevent, ILogger log)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
}
}
}

Просмотреть файл

@ -0,0 +1,26 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Extensions.Logging;
namespace Confluent
{
public class KafkaTriggerMany
{
[FunctionName("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string>[] events, ILogger log)
{
foreach (KafkaEventData<string> kevent in events)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
}
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
namespace Confluent
{
public class KafkaTriggerManyWithHeaders
{
[FunctionName("KafkaTriggerManyWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string>[] events, ILogger log)
{
foreach (KafkaEventData<string> eventData in events)
{
log.LogInformation($"C# Kafka trigger function processed a message: {eventData.Value}");
log.LogInformation($"Headers: ");
var headers = eventData.Headers;
foreach (var header in headers)
{
log.LogInformation($"Key = {header.Key} Value = {System.Text.Encoding.UTF8.GetString(header.Value)}");
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,29 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Extensions.Logging;
namespace Confluent
{
public class KafkaTriggerSingleWithHeaders
{
[FunctionName("KafkaTriggerSingleWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string> kevent, ILogger log)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
log.LogInformation("Headers: ");
var headers = kevent.Headers;
foreach (var header in headers)
{
log.LogInformation($"Key = {header.Key} Value = {System.Text.Encoding.UTF8.GetString(header.Value)}");
}
}
}
}

Просмотреть файл

@ -0,0 +1,11 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
}
}

Просмотреть файл

@ -0,0 +1,11 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"BrokerList": "{YOUR_CONFLUENT_CLOUD_BROKER}",
"ConfluentCloudUserName": "{YOUR_CONFLUENT_CLOUD_USERNAME}",
"ConfluentCloudPassword": "{YOUR_CONFLUENT_CLOUD_PASSWORD}",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}

264
samples/dotnet/EventHub/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,264 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# Azure Functions localsettings file
local.settings.json
# User-specific files
*.suo
*.user
*.userosscache
*.sln.docstates
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/
# Visual Studio 2015 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# NUNIT
*.VisualState.xml
TestResult.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
# DNX
project.lock.json
project.fragment.lock.json
artifacts/
*_i.c
*_p.c
*_i.h
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*.log
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opendb
*.opensdf
*.sdf
*.cachefile
*.VC.db
*.VC.VC.opendb
# Visual Studio profiler
*.psess
*.vsp
*.vspx
*.sap
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# JustCode is a .NET coding add-in
.JustCode
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# NCrunch
_NCrunch_*
.*crunch*.local.xml
nCrunchTemp_*
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# TODO: Comment the next line if you want to checkin your web deploy settings
# but database connection strings (with potential passwords) will be unencrypted
#*.pubxml
*.publishproj
# Microsoft Azure Web App publish settings. Comment the next line if you want to
# checkin your Azure Web App publish settings, but sensitive information contained
# in these scripts will be unencrypted
PublishScripts/
# NuGet Packages
*.nupkg
# The packages folder can be ignored because of Package Restore
**/packages/*
# except build/, which is used as an MSBuild target.
!**/packages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/packages/repositories.config
# NuGet v3's project.json files produces more ignoreable files
*.nuget.props
*.nuget.targets
# Microsoft Azure Build Output
csx/
*.build.csdef
# Microsoft Azure Emulator
ecf/
rcf/
# Windows Store app package directories and files
AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
!*.[Cc]ache/
# Others
ClientBin/
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.jfm
*.pfx
*.publishsettings
node_modules/
orleans.codegen.cs
# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
# SQL Server files
*.mdf
*.ldf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
# Microsoft Fakes
FakesAssemblies/
# GhostDoc plugin setting file
*.GhostDoc.xml
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
_Pvt_Extensions
# Paket dependency manager
.paket/paket.exe
paket-files/
# FAKE - F# Make
.fake/
# JetBrains Rider
.idea/
*.sln.iml
# CodeRush
.cr/
# Python Tools for Visual Studio (PTVS)
__pycache__/
*.pyc

Просмотреть файл

@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.3.3" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.4" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,34 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace Eventhub
{
public class KafkaOutput
{
[FunctionName("KafkaOutput")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out string eventData,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.Query["message"];
string responseMessage = "Ok";
eventData = message;
return new OkObjectResult(responseMessage);
}
}
}

Просмотреть файл

@ -0,0 +1,31 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace Eventhub
{
public class KafkaOutputMany
{
[FunctionName("KafkaOutputMany")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string>[] eventDataArr,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
eventDataArr = new KafkaEventData<string>[2];
eventDataArr[0] = new KafkaEventData<string>("one");
eventDataArr[1] = new KafkaEventData<string>("two");
return new OkObjectResult("Ok");
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace Eventhub
{
public class KafkaOutputManyWithHeaders
{
[FunctionName("KafkaOutputManyWithHeaders")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string>[] eventDataArr,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
eventDataArr = new KafkaEventData<string>[2];
eventDataArr[0] = new KafkaEventData<string>("one");
eventDataArr[0].Headers.Add("test", System.Text.Encoding.UTF8.GetBytes("dotnet"));
eventDataArr[1] = new KafkaEventData<string>("two");
eventDataArr[1].Headers.Add("test1", System.Text.Encoding.UTF8.GetBytes("dotnet"));
return new OkObjectResult("Ok");
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace Eventhub
{
public class KafkaOutputWithHeaders
{
[FunctionName("KafkaOutputWithHeaders")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string> eventData,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.Query["message"];
eventData = new KafkaEventData<string>(message);
eventData.Headers.Add("test", System.Text.Encoding.UTF8.GetBytes("dotnet"));
return new OkObjectResult("Ok");
}
}
}

Просмотреть файл

@ -0,0 +1,23 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Extensions.Logging;
namespace Eventhub
{
public class KafkaTrigger
{
[FunctionName("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string> kevent, ILogger log)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
}
}
}

Просмотреть файл

@ -0,0 +1,26 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Extensions.Logging;
namespace Eventhub
{
public class KafkaTriggerMany
{
[FunctionName("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string>[] events, ILogger log)
{
foreach (KafkaEventData<string> kevent in events)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
}
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
namespace Eventhub
{
public class KafkaTriggerManyWithHeaders
{
[FunctionName("KafkaTriggerManyWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string>[] events, ILogger log)
{
foreach (KafkaEventData<string> eventData in events)
{
log.LogInformation($"C# Kafka trigger function processed a message: {eventData.Value}");
log.LogInformation($"Headers: ");
var headers = eventData.Headers;
foreach (var header in headers)
{
log.LogInformation($"Key = {header.Key} Value = {System.Text.Encoding.UTF8.GetString(header.Value)}");
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,29 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Extensions.Logging;
namespace Eventhub
{
public class KafkaTriggerSingleWithHeaders
{
[FunctionName("KafkaTriggerSingleWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string> kevent, ILogger log)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
log.LogInformation("Headers: ");
var headers = kevent.Headers;
foreach (var header in headers)
{
log.LogInformation($"Key = {header.Key} Value = {System.Text.Encoding.UTF8.GetString(header.Value)}");
}
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
{
"version": "2.0"
}

Просмотреть файл

@ -0,0 +1,10 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"BrokerList": "<YOUR_EVENTHUB_NAMESPACE_NAME>.servicebus.windows.net:9093",
"EventHubConnectionString": "<YOUR_EVENTHUB_CONNECTIONSTRING>",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}

Просмотреть файл

@ -5,12 +5,12 @@
<DefaultItemExcludes>**</DefaultItemExcludes>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.1.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.3.3" />
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="1.1.7" />
</ItemGroup>
<ItemGroup>
<!-- <ItemGroup>
<None Update="confluent_cloud_cacert.pem">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</ItemGroup> -->
</Project>

Просмотреть файл

@ -1,8 +1,9 @@
{
"IsEncrypted": false,
"Values": {
"BrokerList": "YOUR_BROKER_LIST_HERE",
"ConfluentCloudUsername": "YOUR_CONFLUENT_USER_NAME_HERE",
"ConfluentCloudPassword": "YOUR_CONFLUENT_PASSWORD_HERE"
"BrokerList": "YOUR_BROKER_LIST_HERE",
"ConfluentCloudUsername": "YOUR_CONFLUENT_USER_NAME_HERE",
"ConfluentCloudPassword": "YOUR_CONFLUENT_PASSWORD_HERE",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}

Просмотреть файл

@ -14,8 +14,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<azure.functions.maven.plugin.version>1.8.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>1.4.0</azure.functions.java.library.version>
<azure.functions.maven.plugin.version>1.18.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>2.0.0</azure.functions.java.library.version>
<functionAppName>kafka-function-20190419163130420</functionAppName>
<functionAppRegion>westus</functionAppRegion>
<stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>
@ -52,16 +52,6 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
@ -75,17 +65,10 @@
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>

Просмотреть файл

@ -14,8 +14,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<azure.functions.maven.plugin.version>1.8.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>1.4.0</azure.functions.java.library.version>
<azure.functions.maven.plugin.version>1.18.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>2.0.0</azure.functions.java.library.version>
<functionAppName>kafka-function-20190419163130420</functionAppName>
<functionAppRegion>westus</functionAppRegion>
<stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>

Просмотреть файл

@ -14,8 +14,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<azure.functions.maven.plugin.version>1.8.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>1.4.0</azure.functions.java.library.version>
<azure.functions.maven.plugin.version>1.18.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>2.0.0</azure.functions.java.library.version>
<functionAppName>kafka-function-20190419163130420</functionAppName>
<functionAppRegion>westus</functionAppRegion>
<stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>

Просмотреть файл

@ -0,0 +1,31 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
import java.util.Optional;
public class KafkaOutputMany {
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}

Просмотреть файл

@ -0,0 +1,40 @@
package com.contoso.kafka;
import com.microsoft.azure.functions.annotation.*;
import com.contoso.kafka.entity.KafkaEntity;
import com.contoso.kafka.entity.KafkaHeaders;
import com.microsoft.azure.functions.*;
import java.util.Optional;
public class KafkaOutputManyWithHeaders {
@FunctionName("KafkaOutputManyWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
KafkaEntity[] kevents = new KafkaEntity[2];
KafkaHeaders[] headersForEvent1 = new KafkaHeaders[1];
headersForEvent1[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent1 = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", "one", headersForEvent1);
KafkaHeaders[] headersForEvent2 = new KafkaHeaders[1];
headersForEvent2[0] = new KafkaHeaders("test1", "java");
KafkaEntity kevent2 = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", "two", headersForEvent2);
kevents[0] = kevent1;
kevents[1] = kevent2;
output.setValue(kevents);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}

Просмотреть файл

@ -0,0 +1,36 @@
package com.contoso.kafka;
import com.microsoft.azure.functions.annotation.*;
import com.contoso.kafka.entity.KafkaEntity;
import com.contoso.kafka.entity.KafkaHeaders;
import com.microsoft.azure.functions.*;
import java.util.Optional;
public class KafkaOutputWithHeaders {
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}

Просмотреть файл

@ -0,0 +1,28 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
public class KafkaTriggerMany {
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
}

Просмотреть файл

@ -0,0 +1,39 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.contoso.kafka.entity.KafkaEntity;
import com.contoso.kafka.entity.KafkaHeaders;
import com.google.gson.Gson;
import com.microsoft.azure.functions.*;
public class KafkaTriggerManyWithHeaders {
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
}

Просмотреть файл

@ -0,0 +1,32 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.contoso.kafka.entity.KafkaEntity;
import com.contoso.kafka.entity.KafkaHeaders;
import com.microsoft.azure.functions.*;
public class KafkaTriggerWithHeaders {
@FunctionName("KafkaTriggerWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) KafkaEntity kafkaEventData,
final ExecutionContext context) {
context.getLogger().info("Java Kafka trigger function called for message: " + kafkaEventData.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kafkaEventData.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}

Просмотреть файл

@ -6,23 +6,23 @@ import com.microsoft.azure.functions.*;
import java.util.Optional;
public class FunctionOutput {
public class SampleKafkaOutput {
/**
* This function listens at endpoint "api/KafkaInput-Java" and send message to the conluent-topic. Two ways to invoke it using "curl" command in bash:
* 1. curl -d "HTTP BODY" {your host}/api/KafkaInput-Java
* 2. curl "{your host}/api/KafkaInput-Java?message=hello"
* This function listens at endpoint "api/KafkaOutput" and send message to the conluent-topic. Two ways to invoke it using "curl" command in bash:
* 1. curl -d "HTTP BODY" {your host}/api/KafkaOutput
* 2. curl "{your host}/api/KafkaOutput?message=hello"
* This sample is for a local cluster. Modify topic and brokerList on the @KafkaOutput annotataion
* For the Confluence Cloud example, please refer the KafkaTrigger-Java-Many on the `TriggerFunction.java`.
*/
@FunctionName("KafkaInput-Java")
public HttpResponseMessage input(
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "message",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "%ConfluentCloudPassword%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
@ -35,6 +35,6 @@ public class FunctionOutput {
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Message Sent, " + message).build();
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}

Просмотреть файл

@ -7,33 +7,30 @@ import com.microsoft.azure.functions.*;
/**
* Azure Functions with HTTP Trigger.
*/
public class TriggerFunction {
public class SampleKafkaTrigger {
/**
* This function consume KafkaEvents on the confluent cloud. Create a local.settings.json or configure AppSettings for configring
* BrokerList and UserName, and Password. The value wrapped with `%` will be replaced with enviornment variables.
* For more details, refer https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-expressions-patterns#binding-expressions---app-settings
* The function is a sample of consuming kafkaEvent on batch.
* The function is a sample of consuming kafkaEvent.
* @param kafkaEventData
* @param context
*/
@FunctionName("KafkaTrigger-Java-Many")
public void runMany(
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "message",
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "%ConfluentCloudPassword%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEventData,
) String kafkaEventData,
final ExecutionContext context) {
for (String message: kafkaEventData) {
context.getLogger().info(message);
}
context.getLogger().info(kafkaEventData);
}
}

Просмотреть файл

@ -0,0 +1,19 @@
package com.contoso.kafka.entity;
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
}

Просмотреть файл

@ -0,0 +1,12 @@
package com.contoso.kafka.entity;
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
}

45
samples/java/eventhub/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,45 @@
# Build output
target/
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
# IDE
.idea/
*.iml
# macOS
.DS_Store
# Azure Functions
local.settings.json
bin/
obj/
tmp/
# Java
.classpath
.project
.settings
# Azure Functions Core Tools Temp dir
azure-functions-core-tools

Просмотреть файл

@ -0,0 +1,30 @@
# based on https://github.com/Azure/azure-functions-docker/blob/master/host/2.0/alpine/amd64/java.Dockerfile
ARG BASE_IMAGE=mcr.microsoft.com/azure-functions/base:2.0-alpine
FROM ${BASE_IMAGE} as runtime-image
FROM openjdk:8-jdk-alpine as jdk
RUN mkdir -p /usr/lib/jvm/java-1.8-openjdk
FROM mcr.microsoft.com/dotnet/core/runtime-deps:2.2-alpine
RUN apk add --no-cache libc6-compat libnsl && \
# workaround for https://github.com/grpc/grpc/issues/17255
ln -s /usr/lib/libnsl.so.2 /usr/lib/libnsl.so.1
ENV AzureWebJobsScriptRoot=/home/site/wwwroot \
HOME=/home \
FUNCTIONS_WORKER_RUNTIME=java
COPY --from=runtime-image [ "/azure-functions-host", "/azure-functions-host" ]
COPY --from=runtime-image [ "/workers/java", "/azure-functions-host/workers/java" ]
COPY --from=jdk /usr/lib/jvm/java-1.8-openjdk /usr/lib/jvm/java-1.8-openjdk
ENV JAVA_HOME /usr/lib/jvm/java-1.8-openjdk
# Kafka extension: Librdkafka
RUN apk update && apk add --no-cache librdkafka-dev
# Kafka extension: Adding files to /home/site/wwwroot
COPY . /home/site/wwwroot
CMD [ "/azure-functions-host/Microsoft.Azure.WebJobs.Script.WebHost" ]

Просмотреть файл

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<WarningsAsErrors></WarningsAsErrors>
<DefaultItemExcludes>**</DefaultItemExcludes>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.3.3" />
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="1.1.7" />
</ItemGroup>
<!-- <ItemGroup>
<None Update="confluent_cloud_cacert.pem">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup> -->
</Project>

Просмотреть файл

@ -0,0 +1,3 @@
{
"version": "2.0"
}

Просмотреть файл

@ -0,0 +1,10 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "java",
"BrokerList": "<YOUR_EVENTHUB_NAMESPACE_NAME>.servicebus.windows.net:9093",
"EventHubConnectionString": "<YOUR_EVENTHUB_CONNECTIONSTRING>",
"topic": "{YOUR_KAFKA_TOPIC_NAME}"
}
}

Просмотреть файл

@ -0,0 +1,193 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.contoso.kafka</groupId>
<artifactId>kafka-function</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Azure Java Functions</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<azure.functions.maven.plugin.version>1.18.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>2.0.0</azure.functions.java.library.version>
<functionAppName>kafka-function-20190419163130420</functionAppName>
<functionAppRegion>westus</functionAppRegion>
<stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>
<functionResourceGroup>java-functions-group</functionResourceGroup>
</properties>
<repositories>
<repository>
<id>maven.snapshots</id>
<name>Maven Central Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>maven.snapshots</id>
<name>Maven Central Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
<version>${azure.functions.java.library.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-functions-maven-plugin</artifactId>
<version>${azure.functions.maven.plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-functions-maven-plugin</artifactId>
<configuration>
<resourceGroup>${functionResourceGroup}</resourceGroup>
<appName>${functionAppName}</appName>
<region>${functionAppRegion}</region>
<runtime><os>windows</os></runtime>
<pricingTier>EP1</pricingTier>
<appSettings>
<!-- Run Azure TriggerFunction from package file by default -->
<property>
<name>WEBSITE_RUN_FROM_PACKAGE</name>
<value>1</value>
</property>
<property>
<name>FUNCTIONS_EXTENSION_VERSION</name>
<value>~3</value>
</property>
<property>
<name>FUNCTIONS_WORKER_RUNTIME</name>
<value>java</value>
</property>
</appSettings>
</configuration>
<executions>
<execution>
<id>package-functions</id>
<goals>
<goal>package</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<overwrite>true</overwrite>
<outputDirectory>${stagingDirectory}</outputDirectory>
<resources>
<resource>
<directory>${project.basedir}</directory>
<includes>
<include>host.json</include>
<include>local.settings.json</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${stagingDirectory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<excludeArtifactIds>azure-functions-java-library</excludeArtifactIds>
</configuration>
</execution>
</executions>
</plugin>
<!--Remove obj folder generated by .NET SDK in maven clean-->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<filesets>
<fileset>
<directory>obj</directory>
</fileset>
</filesets>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,210 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.contoso.kafka</groupId>
<artifactId>kafka-function</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Azure Java Functions</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<azure.functions.maven.plugin.version>1.18.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>2.0.0</azure.functions.java.library.version>
<functionAppName>kafka-function-20190419163130420</functionAppName>
<functionAppRegion>westus</functionAppRegion>
<stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>
<functionResourceGroup>java-functions-group</functionResourceGroup>
</properties>
<repositories>
<repository>
<id>maven.snapshots</id>
<name>Maven Central Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>maven.snapshots</id>
<name>Maven Central Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
<version>${azure.functions.java.library.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-functions-maven-plugin</artifactId>
<version>${azure.functions.maven.plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-functions-maven-plugin</artifactId>
<configuration>
<resourceGroup>${functionResourceGroup}</resourceGroup>
<appName>${functionAppName}</appName>
<region>${functionAppRegion}</region>
<runtime><os>linux</os></runtime>
<pricingTier>EP1</pricingTier>
<appSettings>
<!-- Run Azure TriggerFunction from package file by default -->
<property>
<name>WEBSITE_RUN_FROM_PACKAGE</name>
<value>1</value>
</property>
<property>
<name>FUNCTIONS_EXTENSION_VERSION</name>
<value>~3</value>
</property>
<property>
<name>FUNCTIONS_WORKER_RUNTIME</name>
<value>java</value>
</property>
</appSettings>
</configuration>
<executions>
<execution>
<id>package-functions</id>
<goals>
<goal>package</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<overwrite>true</overwrite>
<outputDirectory>${stagingDirectory}</outputDirectory>
<resources>
<resource>
<directory>${project.basedir}</directory>
<includes>
<include>host.json</include>
<include>local.settings.json</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${stagingDirectory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<excludeArtifactIds>azure-functions-java-library</excludeArtifactIds>
</configuration>
</execution>
</executions>
</plugin>
<!--Remove obj folder generated by .NET SDK in maven clean-->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<filesets>
<fileset>
<directory>obj</directory>
</fileset>
</filesets>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,210 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.contoso.kafka</groupId>
<artifactId>kafka-function</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Azure Java Functions</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<azure.functions.maven.plugin.version>1.18.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>2.0.0</azure.functions.java.library.version>
<functionAppName>kafka-function-20190419163130420</functionAppName>
<functionAppRegion>westus</functionAppRegion>
<stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>
<functionResourceGroup>java-functions-group</functionResourceGroup>
</properties>
<repositories>
<repository>
<id>maven.snapshots</id>
<name>Maven Central Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>maven.snapshots</id>
<name>Maven Central Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
<version>${azure.functions.java.library.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-functions-maven-plugin</artifactId>
<version>${azure.functions.maven.plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-functions-maven-plugin</artifactId>
<configuration>
<resourceGroup>${functionResourceGroup}</resourceGroup>
<appName>${functionAppName}</appName>
<region>${functionAppRegion}</region>
<runtime><os>windows</os></runtime>
<pricingTier>EP1</pricingTier>
<appSettings>
<!-- Run Azure TriggerFunction from package file by default -->
<property>
<name>WEBSITE_RUN_FROM_PACKAGE</name>
<value>1</value>
</property>
<property>
<name>FUNCTIONS_EXTENSION_VERSION</name>
<value>~3</value>
</property>
<property>
<name>FUNCTIONS_WORKER_RUNTIME</name>
<value>java</value>
</property>
</appSettings>
</configuration>
<executions>
<execution>
<id>package-functions</id>
<goals>
<goal>package</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<overwrite>true</overwrite>
<outputDirectory>${stagingDirectory}</outputDirectory>
<resources>
<resource>
<directory>${project.basedir}</directory>
<includes>
<include>host.json</include>
<include>local.settings.json</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${stagingDirectory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<excludeArtifactIds>azure-functions-java-library</excludeArtifactIds>
</configuration>
</execution>
</executions>
</plugin>
<!--Remove obj folder generated by .NET SDK in maven clean-->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<filesets>
<fileset>
<directory>obj</directory>
</fileset>
</filesets>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,31 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
import java.util.Optional;
public class KafkaOutputMany {
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}

Просмотреть файл

@ -0,0 +1,38 @@
package com.contoso.kafka;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
import java.util.Optional;
public class KafkaOutputManyWithHeaders {
@FunctionName("KafkaOutputManyWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
KafkaEntity[] kevents = new KafkaEntity[2];
KafkaHeaders[] headersForEvent1 = new KafkaHeaders[1];
headersForEvent1[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent1 = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", "one", headersForEvent1);
KafkaHeaders[] headersForEvent2 = new KafkaHeaders[1];
headersForEvent2[0] = new KafkaHeaders("test1", "java");
KafkaEntity kevent2 = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", "two", headersForEvent2);
kevents[0] = kevent1;
kevents[1] = kevent2;
output.setValue(kevents);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}

Просмотреть файл

@ -0,0 +1,34 @@
package com.contoso.kafka;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
import java.util.Optional;
public class KafkaOutputWithHeaders {
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username= "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}

Просмотреть файл

@ -0,0 +1,28 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
public class KafkaTriggerMany {
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
}

Просмотреть файл

@ -0,0 +1,37 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.google.gson.Gson;
import com.microsoft.azure.functions.*;
public class KafkaTriggerManyWithHeaders {
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
}

Просмотреть файл

@ -0,0 +1,30 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
public class KafkaTriggerWithHeaders {
@FunctionName("KafkaTriggerWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username= "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) KafkaEntity kafkaEventData,
final ExecutionContext context) {
context.getLogger().info("Java Kafka trigger function called for message: " + kafkaEventData.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kafkaEventData.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}

Просмотреть файл

@ -0,0 +1,38 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
import java.util.Optional;
public class SampleKafkaOutput {
/**
* This function listens at endpoint "api/KafkaOutput" and send kafka message to the eventhub. Two ways to invoke it using "curl" command in bash:
* 1. curl -d "HTTP BODY" {your host}/api/KafkaOutput
* 2. curl "{your host}/api/KafkaOutput?message=hello"
*/
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}

Просмотреть файл

@ -0,0 +1,36 @@
package com.contoso.kafka;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
/**
* Azure Functions with HTTP Trigger.
*/
public class SampleKafkaTrigger {
/**
* This function consume KafkaEvents on the confluent cloud. Create a local.settings.json or configure AppSettings for configring
* BrokerList and UserName, and Password. The value wrapped with `%` will be replaced with enviornment variables.
* For more details, refer https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-expressions-patterns#binding-expressions---app-settings
* The function is a sample of consuming kafkaEvent.
* @param kafkaEventData
* @param context
*/
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
}

Просмотреть файл

@ -0,0 +1,19 @@
package com.contoso.kafka;
public class KafkaEntity {
int Offset;
int Partition;
String Timestamp;
String Topic;
String Value;
KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
}

Просмотреть файл

@ -0,0 +1,12 @@
package com.contoso.kafka;
public class KafkaHeaders{
String Key;
String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
}

Просмотреть файл

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library-kafka</artifactId>
<version>1.0.0</version>
<description>POM was created from install:install-file</description>
</project>

Просмотреть файл

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<metadata>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library-kafka</artifactId>
<versioning>
<release>1.0.0</release>
<versions>
<version>1.0.0</version>
</versions>
<lastUpdated>20200708041508</lastUpdated>
</versioning>
</metadata>

Просмотреть файл

@ -0,0 +1,29 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}

Просмотреть файл

@ -0,0 +1,29 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}

Просмотреть файл

@ -0,0 +1,12 @@
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}

Просмотреть файл

@ -0,0 +1,29 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessages",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}

Просмотреть файл

@ -0,0 +1,29 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessages",
"brokerList": "BrokerList",
"topic": "topic",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}

Просмотреть файл

@ -0,0 +1,11 @@
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}

Просмотреть файл

@ -0,0 +1,30 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out",
"dataType": "string"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}

Просмотреть файл

@ -0,0 +1,30 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out",
"dataType": "string"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}

Просмотреть файл

@ -0,0 +1,14 @@
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = 'Ok'
context.bindings.outputKafkaMessage = ["{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"one\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }",
"{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"two\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"]
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}

Просмотреть файл

@ -0,0 +1,30 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out",
"dataType": "string"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}

Просмотреть файл

@ -0,0 +1,30 @@
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out",
"dataType": "string"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}

Просмотреть файл

@ -0,0 +1,15 @@
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}

Просмотреть файл

@ -4,14 +4,14 @@
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "users",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "functions",
"dataType": "binary"
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}

Просмотреть файл

@ -0,0 +1,17 @@
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}

Просмотреть файл

@ -0,0 +1,4 @@
module.exports = async function (context, event) {
// context.log.info(event)
context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Просмотреть файл

@ -7,7 +7,7 @@
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "message",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",

Просмотреть файл

@ -0,0 +1,18 @@
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "events",
"direction": "in",
"protocol" : "SASLSSL",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"brokerList" : "%BrokerList%"
}
]
}

Просмотреть файл

@ -0,0 +1,7 @@
module.exports = async function (context, events) {
function print(event) {
var eventJson = JSON.parse(event)
context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
}
events.map(print);
};

Просмотреть файл

@ -0,0 +1,18 @@
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup": "$Default",
"dataType": "string",
"cardinality" : "MANY"
}
]
}

Просмотреть файл

@ -0,0 +1,18 @@
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"brokerList": "BrokerList",
"topic": "topic",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup": "$Default",
"dataType": "string",
"cardinality" : "MANY"
}
]
}

Просмотреть файл

@ -0,0 +1,12 @@
module.exports = async function (context, event) {
function print(kevent) {
var keventJson = JSON.parse(kevent)
context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
context.log.info(`Headers for this message:`)
let headers = keventJson.Headers;
headers.forEach(element => {
context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`)
});
}
event.map(print);
};

Просмотреть файл

@ -0,0 +1,16 @@
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"brokerList": "BrokerList",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"topic": "topic",
"consumerGroup": "$Default",
"dataType": "string"
}
]
}

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше