Merge pull request #398 from skaarthik/examples

adding example to process Kafka messages using Spark Streaming C# app
This commit is contained in:
Kaarthik Sivashanmugam 2016-04-20 16:48:40 -07:00
Родитель a73ec14c10 e905ed41d3
Коммит 5d289f2e72
21 изменённых файлов: 378 добавлений и 29 удалений

Просмотреть файл

@ -50,6 +50,35 @@ maxLatencyByDcDataFrame.ShowSchema();
maxLatencyByDcDataFrame.Show();
```
A simple Spark Streaming application that processes messages from Kafka using C# may be implemented using the following code:
``` c#
StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpointPath, () =>
{
var ssc = new StreamingContext(sparkContext, slideDurationInMillis);
ssc.Checkpoint(checkpointPath);
var stream = KafkaUtils.CreateDirectStream(ssc, topicList, kafkaParams, perTopicPartitionKafkaOffsets);
//message format: [timestamp],[loglevel],[logmessage]
var countByLogLevelAndTime = stream
.Map(kvp => Encoding.UTF8.GetString(kvp.Value))
.Filter(line => line.Contains(","))
.Map(line => line.Split(','))
.Map(columns => new KeyValuePair<string, int>(
string.Format("{0},{1}", columns[0], columns[1]), 1))
.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y,
windowDurationInSecs, slideDurationInSecs, 3)
.Map(logLevelCountPair => string.Format("{0},{1}",
logLevelCountPair.Key, logLevelCountPair.Value));
countByLogLevelAndTime.ForeachRDD(countByLogLevel =>
{
foreach (var logCount in countByLogLevel.Collect())
Console.WriteLine(logCount);
});
return ssc;
});
sparkStreamingContext.Start();
sparkStreamingContext.AwaitTermination();
```
Refer to [Mobius\csharp\Samples](csharp/Samples) directory and [sample usage](csharp/Samples/Microsoft.Spark.CSharp/samplesusage.md) for complete samples.
## API Documentation
@ -90,7 +119,7 @@ Note: Refer to [linux-compatibility.md](notes/linux-compatibility.md) for using
## Supported Spark Versions
Mobius is built and tested with [Spark 1.4.1](https://github.com/Microsoft/Mobius/tree/branch-1.4), [Spark 1.5.2](https://github.com/Microsoft/Mobius/tree/branch-1.5) and [Spark 1.6.0](https://github.com/Microsoft/Mobius/tree/master).
Mobius is built and tested with [Spark 1.4.1](https://github.com/Microsoft/Mobius/tree/branch-1.4), [Spark 1.5.2](https://github.com/Microsoft/Mobius/tree/branch-1.5) and [Spark 1.6.*](https://github.com/Microsoft/Mobius/tree/branch-1.6).
## Releases
@ -117,6 +146,6 @@ Mobius is licensed under the MIT license. See [LICENSE](LICENSE) file for full l
* Options to ask your question to the Mobius community
* create issue on [GitHub](https://github.com/Microsoft/Mobius)
* create post with "sparkclr" tag in [Stack Overflow](https://stackoverflow.com/questions/tagged/sparkclr)
* send email to sparkclr-user@googlegroups.com
* join chat at [Mobius room in Gitter](https://gitter.im/Microsoft/Mobius)
* tweet [@MobiusForSpark](http://twitter.com/MobiusForSpark)
* send email to sparkclr-user@googlegroups.com

Просмотреть файл

@ -40,7 +40,7 @@
<!--** Command to launch CSharpBackend in debug mode is "sparkclr-submit.cmd debug" ** -->
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\Git\SparkCLR\examples\Pi\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpWorkerPath" value="C:\Git\Mobius\examples\Pi\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpBackendPortNumber" value="0"/>
-->
</appSettings>

Просмотреть файл

@ -45,7 +45,7 @@
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\Git\SparkCLR\examples\Pi\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpWorkerPath" value="C:\Git\Mobius\examples\Pi\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpBackendPortNumber" value="0"/>
-->
</appSettings>

Просмотреть файл

@ -9,10 +9,10 @@ using System.Runtime.InteropServices;
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("SparkCLR-Pi")]
[assembly: AssemblyDescription("SparkCLR Pi example.")]
[assembly: AssemblyDescription("Mobius Pi example.")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Microsoft Corporation")]
[assembly: AssemblyProduct("Microsoft SparkCLR")]
[assembly: AssemblyProduct("Microsoft Mobius")]
[assembly: AssemblyCopyright("Copyright © Microsoft Corporation 2015")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

Просмотреть файл

@ -21,6 +21,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pi", "Batch\Pi\Pi.csproj",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WordCount", "Batch\WordCount\WordCount.csproj", "{17E4C27F-8441-425A-B82B-23BA5E313CC4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka", "Streaming\Kafka\Kafka.csproj", "{8764EAAA-9D32-4549-A64F-C7C89B014EA6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -51,6 +53,10 @@ Global
{17E4C27F-8441-425A-B82B-23BA5E313CC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{17E4C27F-8441-425A-B82B-23BA5E313CC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{17E4C27F-8441-425A-B82B-23BA5E313CC4}.Release|Any CPU.Build.0 = Release|Any CPU
{8764EAAA-9D32-4549-A64F-C7C89B014EA6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8764EAAA-9D32-4549-A64F-C7C89B014EA6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8764EAAA-9D32-4549-A64F-C7C89B014EA6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8764EAAA-9D32-4549-A64F-C7C89B014EA6}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -62,5 +68,6 @@ Global
{DF840BFB-B3A3-493D-B667-4CF21ADBFCAD} = {6F90310A-2DA2-4E81-A062-8D8A9F47C25B}
{913E6A56-9839-4379-8B3C-855BA9341663} = {AE001E84-471E-4D02-BDDE-40B85915CEAE}
{17E4C27F-8441-425A-B82B-23BA5E313CC4} = {AE001E84-471E-4D02-BDDE-40B85915CEAE}
{8764EAAA-9D32-4549-A64F-C7C89B014EA6} = {6F90310A-2DA2-4E81-A062-8D8A9F47C25B}
EndGlobalSection
EndGlobal

Просмотреть файл

@ -27,7 +27,7 @@
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\Git\SparkCLR\examples\JdbcDataFrame\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpWorkerPath" value="C:\Git\Mobius\examples\JdbcDataFrame\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpBackendPortNumber" value="0"/>
-->
</appSettings>

Просмотреть файл

@ -9,10 +9,10 @@ using System.Runtime.InteropServices;
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("SparkCLRJdbcDataFrameExample")]
[assembly: AssemblyDescription("JDBC Example for DataFrame API in SparkCLR")]
[assembly: AssemblyDescription("JDBC Example for DataFrame API in Mobius")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Microsoft Corporation")]
[assembly: AssemblyProduct("Microsoft SparkCLR")]
[assembly: AssemblyProduct("Microsoft Mobius")]
[assembly: AssemblyCopyright("Copyright © Microsoft Corporation 2015")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

Просмотреть файл

@ -28,7 +28,7 @@
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\Git\SparkCLR\examples\SparkXml\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpWorkerPath" value="C:\Git\Mobius\examples\SparkXml\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpBackendPortNumber" value="0"/>
-->
</appSettings>

Просмотреть файл

@ -9,10 +9,10 @@ using System.Runtime.InteropServices;
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("SparkCLRXmlExample")]
[assembly: AssemblyDescription("XML processing Example for DataFrame API in SparkCLR")]
[assembly: AssemblyDescription("XML processing Example for DataFrame API in Mobius")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Microsoft Corporation")]
[assembly: AssemblyProduct("Microsoft SparkCLR")]
[assembly: AssemblyProduct("Microsoft Mobius")]
[assembly: AssemblyCopyright("Copyright © Microsoft Corporation 2015")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

Просмотреть файл

@ -27,7 +27,7 @@
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\Git\SparkCLR\examples\EventHub\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpWorkerPath" value="C:\Git\Mobius\examples\EventHub\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpBackendPortNumber" value="0"/>
-->

Просмотреть файл

@ -71,9 +71,9 @@ namespace Microsoft.Spark.CSharp.Examples
Thread.Sleep(Timeout.Infinite);
}
/***************************************************************************************************************************************************
* ********* To publish samples events to EventHubs, uncomment package references for EventHub in packages.config uncomment the following line **** *
***************************************************************************************************************************************************/
/*********************************************************************************************************************************************************
* ********* To publish samples events to EventHubs, uncomment package references for EventHub in packages.config and uncomment the following method **** *
**********************************************************************************************************************************************************/
/*
public static void Main(string[] args)
{

Просмотреть файл

@ -11,7 +11,7 @@ namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// Sample SparkCLR application that processes events from EventHub in the format [timestamp],[loglevel],[logmessage]
/// EventPublisher class may be used to publish sample events to consume
/// EventPublisher class may be used to publish sample events to EventHubs to consume in this app
/// </summary>
class SparkCLREventHubsExample
{
@ -29,30 +29,31 @@ namespace Microsoft.Spark.CSharp.Examples
{"eventhubs.checkpoint.dir", "<hdfs path to eventhub checkpoint dir>"},
{"eventhubs.checkpoint.interval", "<interval>"},
};
const int windowDurationInSecs = 5;
const int slideDurationInSecs = 5;
const string checkpointPath = "<hdfs path to spark checkpoint dir>";
//const string outputPath = "<hdfs path to output dir>";
const long slideDuration = 5000;
const long slideDurationInMillis = 5000;
StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpointPath,
() =>
{
var ssc = new StreamingContext(sparkContext, slideDuration);
var ssc = new StreamingContext(sparkContext, slideDurationInMillis);
ssc.Checkpoint(checkpointPath);
var stream = EventHubsUtils.CreateUnionStream(ssc, eventhubsParams);
var countByLogLevelAndTime = stream
.Map(bytes => Encoding.UTF8.GetString(bytes))
.Filter(s => s.Contains(","))
.Filter(line => line.Contains(","))
.Map(line => line.Split(','))
.Map(columns => new KeyValuePair<string, int>(string.Format("{0},{1}", columns[0], columns[1]), 1))
.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y, 5, 5, 3)
.Map(kvp => string.Format("{0},{1}", kvp.Key, kvp.Value));
.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y, windowDurationInSecs, slideDurationInSecs, 3)
.Map(logLevelCountPair => string.Format("{0},{1}", logLevelCountPair.Key, logLevelCountPair.Value));
countByLogLevelAndTime.ForeachRDD(dimensionalCount =>
countByLogLevelAndTime.ForeachRDD(countByLogLevel =>
{
//dimensionalCount.SaveAsTextFile(string.Format("{0}/{1}", outputPath, Guid.NewGuid()));
var dimensionalCountCollection = dimensionalCount.Collect();
var dimensionalCountCollection = countByLogLevel.Collect();
foreach (var dimensionalCountItem in dimensionalCountCollection)
{
Console.WriteLine(dimensionalCountItem);

Просмотреть файл

@ -6,10 +6,10 @@ using System.Runtime.InteropServices;
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("SparkCLREventHub")]
[assembly: AssemblyDescription("Example for processing EventHub events using DStream API in SparkCLR")]
[assembly: AssemblyDescription("Example for processing EventHub events using DStream API in Mobius")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Microsoft Corporation")]
[assembly: AssemblyProduct("Microsoft SparkCLR")]
[assembly: AssemblyProduct("Microsoft Mobius")]
[assembly: AssemblyCopyright("Copyright © Microsoft Corporation 2015")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

Просмотреть файл

@ -27,7 +27,7 @@
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\Git\SparkCLR\examples\JdbcDataFrame\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpWorkerPath" value="C:\Git\Mobius\examples\HdfsWordCount\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpBackendPortNumber" value="0"/>
-->
</appSettings>

Просмотреть файл

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
</configSections>
<log4net>
<root>
<level value="DEBUG" />
<appender-ref ref="ConsoleAppender" />
</root>
<appender name="ConsoleAppender" type="log4net.Appender.ConsoleAppender">
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="[%date] [%thread] [%-5level] [%logger] - %message%newline" />
</layout>
</appender>
</log4net>
<appSettings>
<!--********************************************************************************************************-->
<!--** Uncomment the following settings to run Spark driver executable in **local** or **debug** modes ** -->
<!--** In debug mode, the driver is not launched by CSharpRunner but launched from VS or command prompt not configured for SparkCLR ** -->
<!--** CSharpBackend should be launched in debug mode as well and the port number from that should be used below ** -->
<!--** Command to launch CSharpBackend in debug mode is "sparkclr-submit.cmd debug" ** -->
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\Git\Mobius\examples\Kafka\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpBackendPortNumber" value="0"/>
-->
</appSettings>
</configuration>

Просмотреть файл

@ -0,0 +1,80 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{8764EAAA-9D32-4549-A64F-C7C89B014EA6}</ProjectGuid>
<OutputType>Exe</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Microsoft.Spark.CSharp.Examples</RootNamespace>
<AssemblyName>SparkClrKafka</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="CSharpWorker">
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.000-PREVIEW-2\lib\net45\CSharpWorker.exe</HintPath>
</Reference>
<Reference Include="log4net, Version=1.2.10.0, Culture=neutral, PublicKeyToken=1b44e1d426115821, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\log4net.2.0.5\lib\net45-full\log4net.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Spark.CSharp.Adapter">
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.000-PREVIEW-2\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=4.5.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="Razorvine.Pyrolite">
<HintPath>..\..\packages\Razorvine.Pyrolite.4.10.0.0\lib\net40\Razorvine.Pyrolite.dll</HintPath>
</Reference>
<Reference Include="Razorvine.Serpent">
<HintPath>..\..\packages\Razorvine.Serpent.1.12.0.0\lib\net40\Razorvine.Serpent.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="MessagePublisher.cs" />
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,75 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Threading;
/*
using Kafka.Client.Cfg;
using Kafka.Client.Messages;
using Kafka.Client.Producers;
*/
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// Publishes Messages that can be used with SparkClrKafka example.
/// Messages published are string values in the format: [timestamp],[loglevel],[logmessage]
/// </summary>
class MessagePublisher
{
/***************************************************************************************************************
* ********* To publish samples messages to Kafka, uncomment the commented out lines in this file and ********* *
* ********* add reference to ZookeeperNet.dll (https://www.nuget.org/packages/ZooKeeper.Net/) and ************ *
* ********* KafkaNet.dll (https://github.com/Microsoft/CSharpClient-for-Kafka) ****************************** *
****************************************************************************************************************/
private static readonly string[] LogLevels = { "Info", "Debug", "Error", "Fatal" };
private static readonly Random RandomLogLevel = new Random();
private static long batchIndex = 1;
private static void SendMessageBatch(object state)
{
/*
var kafkaProducerConfig = new ProducerConfiguration(new List<BrokerConfiguration>
{
new BrokerConfiguration {Host = "localhost", Port = 9092}
});
var kafkaProducer = new Producer(kafkaProducerConfig);
*/
var topicName = "<kafkaTopicName>";
var now = DateTime.Now.Ticks;
for (int messageIndex = 1; messageIndex <= 5; messageIndex++)
{
try
{
var message = string.Format("{0},{1},{2}", now, LogLevels[RandomLogLevel.Next(4)], "LogMessage id " + Guid.NewGuid());
//kafkaProducer.Send(new ProducerData<string, Message>(topicName, new Message(Encoding.UTF8.GetBytes(message))));
}
catch (Exception exception)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("{0} > Exception in message {3} in batch {2}: {1}", DateTime.Now, exception.Message, batchIndex, messageIndex);
Console.ResetColor();
}
}
Console.WriteLine("Completed sending all messages in batch {0}", batchIndex++);
}
public static void Publish()
{
var timer = new Timer(SendMessageBatch, null, 0, 60000);
Thread.Sleep(Timeout.Infinite);
}
/*
public static void Main(string[] args)
{
Publish();
}
*/
}
}

Просмотреть файл

@ -0,0 +1,70 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Streaming;
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// Sample SparkCLR application that processes events from Kafka in the format [timestamp],[loglevel],[logmessage]
/// MessagePublisher class may be used to publish sample messages to Kafka to consume in this app
/// </summary>
class SparkClrKafkaExample
{
static void Main(string[] args)
{
var sparkContext = new SparkContext(new SparkConf().SetAppName("SparkCLRKafka Example"));
const string topicName = "<topicName>";
var topicList = new List<string> {topicName};
var kafkaParams = new Dictionary<string, string> //refer to http://kafka.apache.org/documentation.html#configuration
{
{"metadata.broker.list", "<kafka brokers list>"},
{"auto.offset.reset", "smallest"}
};
var perTopicPartitionKafkaOffsets = new Dictionary<string, long>();
const int windowDurationInSecs = 5;
const int slideDurationInSecs = 5;
const string checkpointPath = "<hdfs path to spark checkpoint directory>";
const string appOutputPath = "<hdfs path to app output directory>";
const long slideDurationInMillis = 5000;
StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpointPath,
() =>
{
var ssc = new StreamingContext(sparkContext, slideDurationInMillis);
ssc.Checkpoint(checkpointPath);
var stream = KafkaUtils.CreateDirectStream(ssc, topicList, kafkaParams, perTopicPartitionKafkaOffsets);
var countByLogLevelAndTime = stream
.Map(kvp => Encoding.UTF8.GetString(kvp.Value))
.Filter(line => line.Contains(","))
.Map(line => line.Split(','))
.Map(columns => new KeyValuePair<string, int>(string.Format("{0},{1}", columns[0], columns[1]), 1))
.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y, windowDurationInSecs, slideDurationInSecs, 3)
.Map(logLevelCountPair => string.Format("{0},{1}", logLevelCountPair.Key, logLevelCountPair.Value));
countByLogLevelAndTime.ForeachRDD(countByLogLevel =>
{
countByLogLevel.SaveAsTextFile(string.Format("{0}/{1}", appOutputPath, Guid.NewGuid()));
foreach (var logCount in countByLogLevel.Collect())
{
Console.WriteLine(logCount);
}
});
return ssc;
});
sparkStreamingContext.Start();
sparkStreamingContext.AwaitTermination();
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("SparkclrKafka")]
[assembly: AssemblyDescription("Example for processing Kafka events using DStream API in Mobius")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Microsoft Corporation")]
[assembly: AssemblyProduct("Microsoft Mobius")]
[assembly: AssemblyCopyright("Copyright © Microsoft Corporation 2015")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="log4net" version="2.0.5" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="1.6.000-PREVIEW-2" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="Razorvine.Pyrolite" version="4.10.0.0" targetFramework="net45" />
<package id="Razorvine.Serpent" version="1.12.0.0" targetFramework="net45" />
</packages>

Просмотреть файл

@ -80,7 +80,13 @@ The instructions above cover running Mobius applications in Windows. With the fo
* Instead of `RunSamples.cmd`, use `run-samples.sh`
* Instead of `sparkclr-submit.cmd`, use `sparkclr-submit.sh`
## Running Examples in Local Mode
## Running Mobius Examples in Local Mode
| Type | Examples |
| ------------- |--------------|
| Batch | <ul><li>[Pi](#pi-example-batch)</li><li>[Word Count](#wordcount-example-batch)</li></ul> |
| SQL | <ul><li>[JDBC](#jdbc-example-sql)</li><li>[Spark-XML](#spark-xml-example-sql)</li></ul> |
| Streaming | <ul><li>[Kafka](#kafka-example-streaming)</li><li>[EventHubs](#eventhubs-example-streaming)</li><li>[HDFS Word Count](#hdfswordcount-example-streaming)</li></ul> |
The following sample commands show how to run Mobius examples in local mode. Using the instruction above, the following sample commands can be tweaked to run in other modes
### Pi Example (Batch)
@ -123,3 +129,8 @@ Note that all the dependencies listed above are available in maven that can be d
* Run `sparkclr-submit.cmd --exe SparkClrHdfsWordCount.exe C:\Git\Mobius\examples\Streaming\HdfsWordCount\bin\Debug <checkpoint directory> <input directory>`
Counts words in new text files created in the given directory using Mobius streaming.
### Kafka Example (Streaming)
* Publish sample messages to Kafka to be used in this example using MessagePublisher class (remember to include reference to KafkaNet library (https://github.com/Microsoft/CSharpClient-for-Kafka), connection parameters to Kafka and uncomment commented out statements to build and use MessagePublisher)
* Update Kafka parameters in SparkClrKafkaExample implementation and build
* `sparkclr-submit.cmd --master local[4] --conf spark.local.dir=d:\temp --exe SparkClrKafka.exe C:\Git\Mobius\examples\Streaming\Kafka\bin\Debug`