Cassandra example and other minor updates

This commit is contained in:
Kaarthik Sivashanmugam 2016-06-13 17:43:51 -07:00 коммит произвёл GitHub
Родитель a7f1360b7a
Коммит 6cecf1c7b2
12 изменённых файлов: 292 добавлений и 9 удалений

Просмотреть файл

@ -9,7 +9,7 @@ using Microsoft.Spark.CSharp.Services;
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// SparkCLR Pi example
/// Mobius Pi example
/// Calculate Pi
/// Reference: https://github.com/apache/spark/blob/branch-1.5/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
/// </summary>

Просмотреть файл

@ -1,7 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 2013
VisualStudioVersion = 12.0.30501.0
# Visual Studio 14
VisualStudioVersion = 14.0.25123.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HdfsWordCount", "Streaming\HdfsWordCount\HdfsWordCount.csproj", "{6A2C7CF9-D64E-490D-9841-269EE14F7932}"
EndProject
@ -27,6 +26,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka", "Streaming\Kafka\Ka
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HiveDataFrame", "Sql\HiveDataFrame\HiveDataFrame.csproj", "{5C97498A-C4DB-43DD-86AD-4E50DEE8D405}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CassandraDataFrame", "Sql\CassandraDataFrame\CassandraDataFrame.csproj", "{9FCC75C4-347F-44E8-9B07-C5273066DF9C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -69,6 +70,10 @@ Global
{5C97498A-C4DB-43DD-86AD-4E50DEE8D405}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5C97498A-C4DB-43DD-86AD-4E50DEE8D405}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5C97498A-C4DB-43DD-86AD-4E50DEE8D405}.Release|Any CPU.Build.0 = Release|Any CPU
{9FCC75C4-347F-44E8-9B07-C5273066DF9C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9FCC75C4-347F-44E8-9B07-C5273066DF9C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9FCC75C4-347F-44E8-9B07-C5273066DF9C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9FCC75C4-347F-44E8-9B07-C5273066DF9C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -83,5 +88,6 @@ Global
{17B63D32-EFC8-4EF5-831A-197A4FC29F06} = {AE001E84-471E-4D02-BDDE-40B85915CEAE}
{8764EAAA-9D32-4549-A64F-C7C89B014EA6} = {6F90310A-2DA2-4E81-A062-8D8A9F47C25B}
{5C97498A-C4DB-43DD-86AD-4E50DEE8D405} = {28600A86-E011-41C9-AB41-591580EDB9F1}
{9FCC75C4-347F-44E8-9B07-C5273066DF9C} = {28600A86-E011-41C9-AB41-591580EDB9F1}
EndGlobalSection
EndGlobal

Просмотреть файл

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
</configSections>
<log4net>
<root>
<level value="DEBUG" />
<appender-ref ref="ConsoleAppender" />
</root>
<appender name="ConsoleAppender" type="log4net.Appender.ConsoleAppender">
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="[%date] [%thread] [%-5level] [%logger] - %message%newline" />
</layout>
</appender>
</log4net>
<appSettings>
<!--********************************************************************************************************-->
<!--** Uncomment the following settings to run Spark driver executable in **local** or **debug** modes ** -->
<!--** In debug mode, the driver is not launched by CSharpRunner but launched from VS or command prompt not configured for SparkCLR ** -->
<!--** CSharpBackend should be launched in debug mode as well and the port number from that should be used below ** -->
<!--** Command to launch CSharpBackend in debug mode is "sparkclr-submit.cmd debug" ** -->
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\Git\Mobius\examples\CassandraDataFrame\bin\Debug\CSharpWorker.exe"/>
<add key="CSharpBackendPortNumber" value="0"/>
-->
</appSettings>
</configuration>

Просмотреть файл

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{9FCC75C4-347F-44E8-9B07-C5273066DF9C}</ProjectGuid>
<OutputType>Exe</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>CassandraDataFrame</RootNamespace>
<AssemblyName>CassandraDataFrame</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="CSharpWorker">
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.100\lib\net45\CSharpWorker.exe</HintPath>
</Reference>
<Reference Include="log4net">
<HintPath>..\..\packages\log4net.2.0.5\lib\net45-full\log4net.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Spark.CSharp.Adapter">
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.100\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=4.5.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="Razorvine.Pyrolite">
<HintPath>..\..\packages\Razorvine.Pyrolite.4.10.0.0\lib\net40\Razorvine.Pyrolite.dll</HintPath>
</Reference>
<Reference Include="Razorvine.Serpent">
<HintPath>..\..\packages\Razorvine.Serpent.1.12.0.0\lib\net40\Razorvine.Serpent.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,106 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Sql;
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// This example shows how to use Cassandra in .NET Apache Spark DataFrame API
/// It covers load rows from Cassandra, filter it and save results to another table
/// </summary>
public class CassandraDataFrameExample
{
static void Main(string[] args)
{
var cassandraHostName = "localhost";
var cassandraKeySpace = "ks";
var cassandraTableToRead = "users";
var cassandraTableToInsert = "filteredusers";
if (args.Length == 4)
{
cassandraHostName = args[0];
cassandraKeySpace = args[1];
cassandraTableToRead = args[2];
cassandraTableToInsert = args[3];
}
/*
** CQL used to create table in Cassandra for this example **
CREATE TABLE users (
username VARCHAR,
firstname VARCHAR,
lastname VARCHAR,
PRIMARY KEY (username)
);
INSERT INTO ks.users (username, firstname, lastname) VALUES ('JD123', 'John', 'Doe');
INSERT INTO ks.users (username, firstname, lastname) VALUES ('BillJ', 'Bill', 'Jones');
INSERT INTO ks.users (username, firstname, lastname) VALUES ('SL', 'Steve', 'Little');
CREATE TABLE filteredusers (
username VARCHAR,
firstname VARCHAR,
lastname VARCHAR,
PRIMARY KEY (username)
);
*/
var sparkConf = new SparkConf().Set("spark.cassandra.connection.host", cassandraHostName);
var sparkContext = new SparkContext(sparkConf);
var sqlContext = new SqlContext(sparkContext);
//read from cassandra table
var usersDataFrame =
sqlContext.Read()
.Format("org.apache.spark.sql.cassandra")
.Options(new Dictionary<string, string> { {"keyspace", cassandraKeySpace }, { "table", cassandraTableToRead } })
.Load();
//display rows in the console
usersDataFrame.Show();
var createTempTableStatement =
string.Format(
"CREATE TEMPORARY TABLE userstemp USING org.apache.spark.sql.cassandra OPTIONS(table \"{0}\", keyspace \"{1}\")",
cassandraTableToRead,
cassandraKeySpace);
//create a temp table
sqlContext.Sql(createTempTableStatement);
//read from temp table, filter it and display schema and rows
var filteredUsersDataFrame = sqlContext.Sql("SELECT * FROM userstemp").Filter("username = 'SL'");
filteredUsersDataFrame.ShowSchema();
filteredUsersDataFrame.Show();
//write filtered rows to another table
filteredUsersDataFrame.Write()
.Format("org.apache.spark.sql.cassandra")
.Options(new Dictionary<string, string> { { "keyspace", cassandraKeySpace }, { "table", cassandraTableToInsert } })
.Save();
//convert to RDD, execute map & filter and collect result
var rddCollectedItems = usersDataFrame.ToRDD()
.Map(
r =>
string.Format("{0},{1},{2}", r.GetAs<string>("username"),
r.GetAs<string>("firstname"),
r.GetAs<string>("lastname")))
.Filter(s => s.Contains("SL"))
.Collect();
foreach (var rddCollectedItem in rddCollectedItems)
{
Console.WriteLine(rddCollectedItem);
}
Console.WriteLine("Completed running example");
}
}
}

Просмотреть файл

@ -0,0 +1,34 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("CassandraDataFrameExample")]
[assembly: AssemblyDescription("Example application for Mobius C# API to access Cassandra data using Apache Spark")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Microsoft Corporation")]
[assembly: AssemblyProduct("Microsoft Mobius")]
[assembly: AssemblyCopyright("Copyright © Microsoft Corporation 2015")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// [assembly: CLSCompliant(true)]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
[assembly: AssemblyVersion("1.0.0")]
[assembly: AssemblyFileVersion("1.0.0")]

Просмотреть файл

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="log4net" version="2.0.5" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="Razorvine.Pyrolite" version="4.10.0.0" targetFramework="net45" />
<package id="Razorvine.Serpent" version="1.12.0.0" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="1.6.100" targetFramework="net45" />
</packages>

Просмотреть файл

@ -11,7 +11,7 @@ using System.IO;
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// This example shows how to use Hive in SparkCLR's C# API for Apache Spark DataFrame to
/// This example shows how to use Hive in Mobius .NET API for Apache Spark DataFrame to
/// load data from Hive.
/// </summary>
class HiveDataFrameExample

Просмотреть файл

@ -10,7 +10,7 @@ using Microsoft.Spark.CSharp.Sql;
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// This example shows how to use JDBC in SparkCLR's C# API for Apache Spark DataFrame to
/// This example shows how to use JDBC in Mobius C# API for Apache Spark DataFrame to
/// load data from SQL Server. The connection url need to be updated for a different JDBC source.
/// </summary>
class JdbcDataFrameExample

Просмотреть файл

@ -10,7 +10,8 @@ using Microsoft.Spark.CSharp.Streaming;
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// Sample SparkCLR application that processes events from EventHub in the format [timestamp],[loglevel],[logmessage]
/// Sample Mobius application that processes events from EventHub using .NET API for Apache Spark
/// Event format: [timestamp],[loglevel],[logmessage]
/// EventPublisher class may be used to publish sample events to EventHubs to consume in this app
/// </summary>
class SparkCLREventHubsExample

Просмотреть файл

@ -13,7 +13,7 @@ using Microsoft.Spark.CSharp.Streaming;
namespace Microsoft.Spark.CSharp.Examples
{
/// <summary>
/// Sample SparkCLR application that processes events from Kafka in the format [timestamp],[loglevel],[logmessage]
/// Sample Mobius application that processes events from Kafka in the format [timestamp],[loglevel],[logmessage]
/// MessagePublisher class may be used to publish sample messages to Kafka to consume in this app
/// </summary>
class SparkClrKafkaExample

Просмотреть файл

@ -149,7 +149,7 @@ The [instructions](./running-mobius-app.md#windows-instructions) above cover run
| Type | Examples |
| ------------- |--------------|
| Batch | <ul><li>[Pi](#pi-example-batch)</li><li>[Word Count](#wordcount-example-batch)</li></ul> |
| SQL | <ul><li>[JDBC](#jdbc-example-sql)</li><li>[Spark-XML](#spark-xml-example-sql)</li><li>[Hive](#hive-example-sql)</li></ul> |
| SQL | <ul><li>[JDBC](#jdbc-example-sql)</li><li>[Spark-XML](#spark-xml-example-sql)</li><li>[Hive](#hive-example-sql)</li><li>[Cassandra](#cassandra-example-sql)</li></ul> |
| Streaming | <ul><li>[Kafka](#kafka-example-streaming)</li><li>[EventHubs](#eventhubs-example-streaming)</li><li>[HDFS Word Count](#hdfswordcount-example-streaming)</li></ul> |
The following sample commands show how to run Mobius examples in local mode. Using the instruction above, the following sample commands can be tweaked to run in other modes
@ -180,6 +180,19 @@ Displays the number of XML elements in the input XML file provided as the first
`sparkclr-submit.cmd --jars <jar files used for using Hive in Spark> --exe HiveDataFrame.exe C:\Git\Mobius\examples\Sql\HiveDataFrame\bin\Debug`
Reads data from a csv file, creates a Hive table and reads data from it
### Cassandra Example (Sql)
* Download following jars that are needed to use Spark with Cassandra. **Note** that you need to get the right version of the jar files depending on the versions of Spark and Cassandra. Refer to [version compatibility](https://github.com/datastax/spark-cassandra-connector#version-compatibility) table for details.
* [spark-cassandra-connector_2.10-1.6.0.jar](https://spark-packages.org/package/datastax/spark-cassandra-connector)
* cassandra-driver-core-3.0.2.jar
* guava-19.0.jar
* jsr166e-1.1.0.jar
* Create keyspace, tables and insert data necessary for testing. Look at CassandraDataFrameExample code for CQL to setup test data.
* `sparkclr-submit.cmd --jars <jar files used for using Cassandra in Spark> --exe CassandraDataFrameExample.exe C:\Git\Mobius\examples\Sql\CassandraDataFrame\bin\Debug`
* **Note** - If you created keyspace and tables with different names than what is in the CQL in the example or do not have Cassandra in localhost, you need to pass arguments to the example `sparkclr-submit.cmd --jars <jar files used for using Cassandra in Spark> --exe CassandraDataFrameExample.exe C:\Git\Mobius\examples\Sql\CassandraDataFrame\bin\Debug <host name> <keyspace name> <users table name> <filtered users table name>`
This sample reads data from a table, displays results in the console, performs filter on dataframe and writes results to another table
### EventHubs Example (Streaming)
* Get the following jar files
* qpid-amqp-1-0-client-0.32.jar