This commit is contained in:
Devis Lucato 2016-09-30 16:39:57 -07:00
Коммит b3379d2865
37 изменённых файлов: 1733 добавлений и 0 удалений

82
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,82 @@
### Project specific ###
.idea
### MacOS ###
.DS_Store
### Scala ###
*.class
*.log
# sbt specific
.cache
.history
.lib/
dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
# Scala-IDE specific
.scala_dependencies
.worksheet
# ENSIME specific
.ensime_cache/
.ensime
### Intellij ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea/workspace.xml
.idea/tasks.xml
.idea/dictionaries
.idea/vcs.xml
.idea/jsLibraryMappings.xml
# Sensitive or high-churn files:
.idea/dataSources.ids
.idea/dataSources.xml
.idea/dataSources.local.xml
.idea/sqlDataSources.xml
.idea/dynamic.xml
.idea/uiDesigner.xml
# Gradle:
.idea/gradle.xml
.idea/libraries
# Mongo Explorer plugin:
.idea/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
### Intellij Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
# *.iml
# modules.xml
# .idea/misc.xml
# *.ipr

22
Codestyle.IntelliJ.xml Normal file
Просмотреть файл

@ -0,0 +1,22 @@
<code_scheme name="Toketi">
<option name="RIGHT_MARGIN" value="80" />
<option name="JD_ADD_BLANK_AFTER_RETURN" value="true" />
<ScalaCodeStyleSettings>
<option name="PLACE_CLOSURE_PARAMETERS_ON_NEW_LINE" value="true" />
<option name="ALIGN_IN_COLUMNS_CASE_BRANCH" value="true" />
<option name="USE_ALTERNATE_CONTINUATION_INDENT_FOR_PARAMS" value="true" />
<option name="SD_BLANK_LINE_AFTER_PARAMETERS_COMMENTS" value="true" />
<option name="SD_BLANK_LINE_AFTER_RETURN_COMMENTS" value="true" />
<option name="SD_BLANK_LINE_BEFORE_PARAMETERS" value="true" />
<option name="REPLACE_CASE_ARROW_WITH_UNICODE_CHAR" value="true" />
<option name="REPLACE_MAP_ARROW_WITH_UNICODE_CHAR" value="true" />
<option name="REPLACE_FOR_GENERATOR_ARROW_WITH_UNICODE_CHAR" value="true" />
</ScalaCodeStyleSettings>
<codeStyleSettings language="JAVA">
<option name="KEEP_FIRST_COLUMN_COMMENT" value="false" />
</codeStyleSettings>
<codeStyleSettings language="Scala">
<option name="ALIGN_MULTILINE_PARAMETERS" value="false" />
<option name="ALIGN_GROUP_FIELD_DECLARATIONS" value="true" />
</codeStyleSettings>
</code_scheme>

20
LICENSE Normal file
Просмотреть файл

@ -0,0 +1,20 @@
Copyright (c) Microsoft Corporation
All rights reserved.
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the ""Software""), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

151
README.md Normal file
Просмотреть файл

@ -0,0 +1,151 @@
# IoTHubReact
IoTHub React is an Akka Stream library that can be used to read data from
[Azure IoTHub](https://azure.microsoft.com/en-us/services/iot-hub/).
Azure IoTHub is used to connect devices to the Azure cloud.
A simple example on how to use the library is the following:
```scala
new IoTHub().source()
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
.filter(_.value > 100)
.to(console)
.run()
```
A stream of incoming telemetry data is read, parse and converted to a
`Temperature` object and filtered based on its value.
A more interesting example is reading telemetry data from Azure IoTHub, and
send it to a Kafka topic so it can be consumed by other services downstream
```scala
...
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.clients.producer.ProducerRecord
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
case class KafkaProducer(bootstrapServer: String)(implicit val system: ActorSystem) {
protected val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServer)
def getSink() = Producer.plainSink(producerSettings)
def packageMessage(elem: String, topic: String): ProducerRecord[Array[Byte], String] = {
new ProducerRecord[Array[Byte], String](topic, elem)
}
}
```
```scala
val kafkaProducer = KafkaProducer(bootstrapServer)
IoTHub.source()
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
.filter(_.value > 100)
.runWith(kafkaProducer.getSink())
```
The library supports also reading from a single
[IoTHub partition](https://azure.microsoft.com/en-us/documentation/articles/event-hubs-overview),
so a service that consumes IoTHub events can create multiple streams and
process them independently.
```scala
val partitionNumber = 1
IoTHub.source(partitionNumber)
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
.filter(_.value > 100)
.to(console)
.run()
```
## Configuration
IoTHubReact is available on Maven Central, you just need to add the following
reference in your `build.sbt` file:
```scala
libraryDependencies ++= {
val iothubReactV = "0.6.0"
Seq(
"com.microsoft.azure.iot" %% "iothub-react" % iothubReactV
)
}
```
### IoTHub configuration
IoTHubReact uses a configuration file to fetch the parameters required to
connect to Azure IoTHub.
The exact values to use can be found in the Azure Portal:
* **namespace**: it is the first part of the _Event Hub-compatible endpoint_,
which usually has this format: `sb://<IoTHub namespace>.servicebus.windows.net/`
* **name**: see _Event Hub-compatible name_
* **keyname**: usually the value is `service`
* **key**: the Primary Key that you can find under
_shared access policies/service_ policy (it's a base64 encoded string)
The values should be stored in your `application.conf` resource (or equivalent),
which can reference environment settings if you prefer.
```
iothub {
namespace = "<IoT hub namespace>"
name = "<IoT hub name>"
keyName = "<IoT hub key name>"
key = "<IoT hub key value>"
partitions = <IoT hub partitions>
}
````
Example using environment settings:
```
iothub {
namespace = ${?IOTHUB_NAMESPACE}
name = ${?IOTHUB_NAME}
keyName = ${?IOTHUB_ACCESS_KEY_NAME}
key = ${?IOTHUB_ACCESS_KEY_VALUE}
partitions = ${?IOTHUB_PARTITIONS}
}
````
There are other settings, to tune performance and connection details:
* **consumerGroup**: the
[consumer group](https://azure.microsoft.com/en-us/documentation/articles/event-hubs-overview)
used during the connection
* **receiverBatchSize**: the number of messages retrieved on each call to
Azure IoT hub. The default (an maximum) value is 999.
* **receiverTimeout**: timeout applied to calls while retrieving messages. The
default value is 3 seconds.
## Samples
In order to run the samples you need to have either the IoTHub settings defined
via environment variables, or in the config file. Follow the instructions in
the previous section on how to set the correct values.
* [`samples-scala`](samples-scala/src/main/scala):
You can use `sbt run` to run the demos.
* [`samples-java`](samples-java/src/main/java):
You can use
`mvn clean compile exec:java -Dexec.mainClass="DisplayMessages.Demo"`
to run the demo app.
## Future work
* improve asynchronicity by using EventHub SDK async APIs
* add support for checkpointing so the library can be used in scenarios where
the consuming service needs to be resilient to crashes/restarts
* add Sink for Cloud2Device scenarios. `IoTHub.Sink` will allow cloud services
to send messages to devices (via Azure IoTHub)
# Contribute Code
If you want/plan to contribute, we ask you to sign a
[CLA](https://cla.microsoft.com/) (Contribution license Agreement). A friendly
bot will remind you about it when you submit a pull-request.
If you are sending a pull request we kindly request to check the code style
with IntelliJ IDEA, importing the settings from `Codestyle.IntelliJ.xml`.

55
build.sbt Normal file
Просмотреть файл

@ -0,0 +1,55 @@
// Copyright (c) Microsoft. All rights reserved.
name := "iothub-react"
organization := "com.microsoft.azure.iot"
version := "0.6.0"
scalaVersion := "2.11.8"
// @todo Akka version depends on Scala version (2.10 => 2.3.15, 2.11 => 2.4.9)
//crossScalaVersions := Seq("2.10.6", "2.11.8")
logLevel := Level.Warn // Debug|Info|Warn|Error
scalacOptions ++= Seq("-deprecation", "-explaintypes", "-optimise",
"-unchecked", "-verbose")
libraryDependencies ++= {
val akkaStreamVersion = "2.4.9"
val azureEventHubSDKVersion = "0.8.2"
Seq(
// Library dependencies
"com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
"com.microsoft.azure" % "azure-eventhubs" % azureEventHubSDKVersion,
// Tests dependencies
"org.scalatest" %% "scalatest" % "3.0.0" % "test",
"com.microsoft.azure.iothub-java-client" % "iothub-java-device-client" % "1.0.14" % "test",
"com.jsuereth" %% "scala-arm" % "1.4" % "test",
// Remove < % "test" > to run samples-scala against the local workspace
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.3" % "test",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.8.2" % "test"
)
}
lazy val root = project.in(file(".")).configs(IntegrationTest)
/*
* Publishing options
* see http://www.scala-sbt.org/0.13/docs/Artifacts.html
*/
publishArtifact in Test := true
publishArtifact in(Compile, packageDoc) := true
publishArtifact in(Compile, packageSrc) := true
publishArtifact in(Compile, packageBin) := true
// Note: for Bintray, unpublish using SBT
licenses += ("MIT", url("https://github.com/Azure/toketi-iothubreact/blob/master/LICENSE"))
publishMavenStyle := true
// Bintray: Organization > Repository > Package > Version
bintrayOrganization := Some("microsoftazuretoketi")
bintrayRepository := "toketi-repo"
bintrayPackage := "iothub-react"
bintrayReleaseOnPublish in ThisBuild := true

1
project/build.properties Normal file
Просмотреть файл

@ -0,0 +1 @@
sbt.version = 0.13.12

1
project/plugins.sbt Normal file
Просмотреть файл

@ -0,0 +1 @@
addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")

50
samples-java/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react-demo</artifactId>
<version>0.6.0</version>
<repositories>
<repository>
<id>toketi-snapshots</id>
<url>https://dl.bintray.com/microsoftazuretoketi/toketi-repo/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react_2.11</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>2.4.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,38 @@
// Copyright (c) Microsoft. All rights reserved.
package DisplayMessages;
import akka.Done;
import akka.NotUsed;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.microsoft.azure.iot.iothubreact.javadsl.IoTHub;
import com.microsoft.azure.iot.iothubreact.IoTMessage;
import java.util.concurrent.CompletionStage;
/**
* Retrieve messages from IoT hub and display the data in the console
*/
public class Demo extends ReactiveStreamingApp {
static Source<IoTMessage, NotUsed> messagesFromAllPartitions;
public static void main(String args[]) {
// Source retrieving messages from one IoT hub partition (0 to N-1, where N is
// defined at deployment time)
//Source messagesFromOnePartition = new IoTHub().source(PARTITION);
// Source retrieving messages from all IoT hub partitions
Source messagesFromAllPartitions = new IoTHub().source();
messagesFromAllPartitions
.to(console())
.run(streamMaterializer);
}
public static Sink<IoTMessage, CompletionStage<Done>> console() {
return Sink.foreach(m -> System.out.println(m.deviceId() + ": " + m.contentAsString()));
}
}

Просмотреть файл

@ -0,0 +1,17 @@
// Copyright (c) Microsoft. All rights reserved.
package DisplayMessages;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
/**
* Initialize reactive streaming
*/
public class ReactiveStreamingApp {
private static ActorSystem system = ActorSystem.create("Demo");
protected final static Materializer streamMaterializer = ActorMaterializer.create(system);
}

Просмотреть файл

@ -0,0 +1,39 @@
// Configuration file [HOCON format]
// IoT Hub settings can be retrieved from the Azure portal at https://portal.azure.com
iothub {
// see: "IoT Hub" >> your hub >> "Messaging" >> "Partitions"
partitions = ${?IOTHUB_PARTITIONS}
// see: "IoT Hub" >> your hub >> "Messaging" >> "Event Hub-compatible name"
name = ${?IOTHUB_NAME}
// see: "IoT Hub" >> your hub > "Messaging" >> "Event Hub-compatible endpoint"
// e.g. from "sb://iothub-ns-toketi-i-18552-16281e72ba.servicebus.windows.net/"
// use "iothub-ns-toketi-i-18552-16281e72ba"
namespace = ${?IOTHUB_NAMESPACE}
// see: "IoT Hub" >> your hub >> "Shared access policies"
// e.g. you could use the predefined "iothubowner"
keyName = ${?IOTHUB_ACCESS_KEY_NAME}
// see: "IoT Hub" >> your hub >> "Shared access policies" >> key name >> "Primary key"
key = ${?IOTHUB_ACCESS_KEY_VALUE}
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
receiverTimeout = 3s
// How many messages to retrieve on each pull, max is 999
receiverBatchSize = 999
}
// @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
akka {
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "WARNING"
}

21
samples-scala/build.sbt Normal file
Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
scalaVersion := "2.11.8"
// This repository contains development snapshots. Production releases are in Maven Central.
resolvers += "Toketi Dev Snapshots" at "https://dl.bintray.com/microsoftazuretoketi/toketi-repo"
libraryDependencies ++= {
val prodVersion = "0.6.0"
val devVersion = "0.6.0-dev"
Seq(
"com.microsoft.azure.iot" %% "iothub-react" % prodVersion,
// Jackson libraries for JSON marshalling and unmarshalling
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.3",
// Jackson module for scala object marshalling and unmarshalling
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.8.2"
)
}

Просмотреть файл

@ -0,0 +1 @@
sbt.version = 0.13.12

Просмотреть файл

@ -0,0 +1,39 @@
// Configuration file [HOCON format]
// IoT Hub settings can be retrieved from the Azure portal at https://portal.azure.com
iothub {
// see: "IoT Hub" >> your hub >> "Messaging" >> "Partitions"
partitions = ${?IOTHUB_PARTITIONS}
// see: "IoT Hub" >> your hub >> "Messaging" >> "Event Hub-compatible name"
name = ${?IOTHUB_NAME}
// see: "IoT Hub" >> your hub > "Messaging" >> "Event Hub-compatible endpoint"
// e.g. from "sb://iothub-ns-toketi-i-18552-16281e72ba.servicebus.windows.net/"
// use "iothub-ns-toketi-i-18552-16281e72ba"
namespace = ${?IOTHUB_NAMESPACE}
// see: "IoT Hub" >> your hub >> "Shared access policies"
// e.g. you could use the predefined "iothubowner"
keyName = ${?IOTHUB_ACCESS_KEY_NAME}
// see: "IoT Hub" >> your hub >> "Shared access policies" >> key name >> "Primary key"
key = ${?IOTHUB_ACCESS_KEY_VALUE}
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
receiverTimeout = 3s
// How many messages to retrieve on each pull, max is 999
receiverBatchSize = 999
}
// @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
akka {
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "WARNING"
}

Просмотреть файл

@ -0,0 +1,64 @@
// Copyright (c) Microsoft. All rights reserved.
package MessagesThroughput
import akka.stream.ThrottleMode
import akka.stream.scaladsl.{Flow, Sink}
import com.microsoft.azure.iot.iothubreact.IoTMessage
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHub
import scala.concurrent.duration._
import scala.io.StdIn
/** Retrieve messages from IoT hub managing the stream velocity
*
* Demo showing how to:
* - Measure the streaming throughput
* - Traffic shaping by throttling the stream speed
* - How to combine multiple destinations
* - Back pressure
*/
object Demo extends App with ReactiveStreaming {
// Maximum speed allowed
val maxSpeed = 200
val showStatsEvery = 1 second
print(s"Do you want to test throttling (${maxSpeed} msg/sec) ? [y/N] ")
val input = StdIn.readLine()
val throttling = input.size > 0 && input(0).toUpper == 'Y'
// Stream throttling sink
val throttler = Flow[IoTMessage]
.throttle(maxSpeed, 1.second, maxSpeed / 10, ThrottleMode.Shaping)
.to(Sink.ignore)
// Messages throughput monitoring sink
val monitor = Sink.foreach[IoTMessage] {
m {
Monitoring.total += 1
Monitoring.totals(m.partition.get) += 1
}
}
// Sink combining throttling and monitoring
val throttleAndMonitor = Flow[IoTMessage]
.alsoTo(throttler)
// .alsoTo(...) // Using alsoTo it's possible to deliver to multiple destinations
.to(monitor)
// Start processing the stream
if (throttling) {
new IoTHub().source
.to(throttleAndMonitor)
.run()
} else {
new IoTHub().source
.to(monitor)
.run()
}
// Print statistics at some interval
Monitoring.printStatisticsWithFrequency(showStatsEvery)
}

Просмотреть файл

@ -0,0 +1,56 @@
// Copyright (c) Microsoft. All rights reserved.
package MessagesThroughput
import com.typesafe.config.ConfigFactory
import scala.collection.parallel.mutable.ParArray
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
/** Monitoring logic, some properties to keep count and a method to print the
* statistics.
* Note: for demo readability the monitoring Sink is in the Demo class
*/
object Monitoring {
// Auxiliary vars
private[this] val iotHubPartitions = ConfigFactory.load().getInt("iothub.partitions")
private[this] var previousTime : Long = 0
private[this] var previousTotal: Long = 0
// Total count of messages
var total: Long = 0
// Total per partition
var totals = new ParArray[Long](iotHubPartitions)
/* Schedule the stats to be printed with some frequency */
def printStatisticsWithFrequency(frequency: FiniteDuration): Unit = {
implicit val system = akka.actor.ActorSystem("system")
system.scheduler.schedule(1 seconds, frequency)(printStats)
}
/** Print the number of messages received from each partition, the total
* and the throughput msg/sec
*/
private[this] def printStats(): Unit = {
val now = java.time.Instant.now.toEpochMilli
if (total > 0 && previousTime > 0) {
print(s"Partitions: ")
for (i 0 until iotHubPartitions - 1) print(pad5(totals(i)) + ",")
print(pad5(totals(iotHubPartitions - 1)))
val throughput = ((total - previousTotal) * 1000 / (now - previousTime)).toInt
println(s" - Total: ${pad5(total)} - Speed: $throughput/sec")
}
previousTotal = total
previousTime = now
}
private[this] def pad5(x: Long): String = f"${x}%05d"
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
package MessagesThroughput
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
/** Initialize reactive streaming
*
* @todo Don't use supervisor with Akka streams
*/
trait ReactiveStreaming {
val decider: Supervision.Decider = {
case e: Exception
println(e.getMessage)
Supervision.Resume
}
implicit val actorSystem = ActorSystem("Demo")
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
}

Просмотреть файл

@ -0,0 +1,44 @@
// Copyright (c) Microsoft. All rights reserved.
package OutputMessagesToConsole
import akka.stream.scaladsl.Sink
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHub
/** Retrieve messages from IoT hub and display the data sent from Temperature
* devices
*/
object Demo extends App with ReactiveStreaming {
// Source retrieving messages from all IoT hub partitions
val messagesFromAllPartitions = new IoTHub().source()
// Source retrieving only recent messages
// import java.time.Instant
// val messagesFromNowOn = new IoTHub().source(Instant.now())
// Source retrieving messages from one IoT hub partition (0 to N-1, where N is
// defined at deployment time)
// val messagesFromOnePartition = new IoTHub().source(PARTITION)
// Sink printing to the console
val console = Sink.foreach[Temperature] {
t println(s"Device ${t.deviceId}: temperature: ${t.value}F ; T=${t.time}")
}
// JSON parser setup
val jsonParser = new ObjectMapper()
jsonParser.registerModule(DefaultScalaModule)
// Start processing the stream
messagesFromAllPartitions
.map(m {
val temperature = jsonParser.readValue(m.contentAsString, classOf[Temperature])
temperature.deviceId = m.deviceId
temperature
})
.to(console)
.run()
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
package OutputMessagesToConsole
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
/** Initialize reactive streaming
*
* @todo Don't use supervisor with Akka streams
*/
trait ReactiveStreaming {
val decider: Supervision.Decider = {
case e: Exception
println(e.getMessage)
Supervision.Resume
}
implicit val actorSystem = ActorSystem("Demo")
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
}

Просмотреть файл

@ -0,0 +1,34 @@
// Copyright (c) Microsoft. All rights reserved.
package OutputMessagesToConsole
import java.time.{ZoneId, ZonedDateTime}
/** Temperature measure by a device
*
* @param deviceId Device ID passed by the device (ideally this matches the ID registered in IoTHub)
* @param value Temperature value measured by the device
* @param time Time (as a string) when the device measured the temperature
*/
class Temperature(var deviceId: String,
val value: Float,
val time: String) {
// ISO8601 with and without milliseconds decimals
private val pattern1 = """(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+).(\d+)Z""".r
private val pattern2 = """(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)Z""".r
/** Parse the time sent by the device.
*
* @return the time as an object
*/
def getTime(): ZonedDateTime = {
time match {
case pattern1(y, m, d, h, i, s, n) ZonedDateTime.of(y.toInt, m.toInt, d.toInt, h.toInt, i.toInt, s.toInt, n.toInt * 1000000, ZoneId.of("UTC"))
case pattern2(y, m, d, h, i, s) ZonedDateTime.of(y.toInt, m.toInt, d.toInt, h.toInt, i.toInt, s.toInt, 0, ZoneId.of("UTC"))
case null null
case _ throw new Exception(s"wrong date time format: $time")
}
}
}

Просмотреть файл

@ -0,0 +1,39 @@
// Configuration file [HOCON format]
// IoT Hub settings can be retrieved from the Azure portal at https://portal.azure.com
iothub {
// see: "IoT Hub" >> your hub >> "Messaging" >> "Partitions"
partitions = ${?IOTHUB_PARTITIONS}
// see: "IoT Hub" >> your hub >> "Messaging" >> "Event Hub-compatible name"
name = ${?IOTHUB_NAME}
// see: "IoT Hub" >> your hub > "Messaging" >> "Event Hub-compatible endpoint"
// e.g. from "sb://iothub-ns-toketi-i-18552-16281e72ba.servicebus.windows.net/"
// use "iothub-ns-toketi-i-18552-16281e72ba"
namespace = ${?IOTHUB_NAMESPACE}
// see: "IoT Hub" >> your hub >> "Shared access policies"
// e.g. you could use the predefined "iothubowner"
keyName = ${?IOTHUB_ACCESS_KEY_NAME}
// see: "IoT Hub" >> your hub >> "Shared access policies" >> key name >> "Primary key"
key = ${?IOTHUB_ACCESS_KEY_VALUE}
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
receiverTimeout = 3s
// How many messages to retrieve on each pull, max is 999
receiverBatchSize = 999
}
// @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
akka {
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "ERROR"
}

Просмотреть файл

@ -0,0 +1,58 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
import com.microsoft.azure.eventhubs.EventHubClient
import com.typesafe.config.{Config, ConfigFactory}
/** Hold IoT Hub configuration settings
*
* @see https://github.com/typesafehub/config for information about the
* configuration file formats
* @todo dependency injection
*/
private object Configuration {
// Maximum size supported by the client
private[this] val MaxBatchSize = 999
// Maximum size supported by the client
private[this] val DefaultReceiverTimeout = 3000
private[this] val conf: Config = ConfigFactory.load()
// IoT hub storage details
val iotHubPartitions: Int = conf.getInt("iothub.partitions")
val iotHubNamespace : String = conf.getString("iothub.namespace")
val iotHubName : String = conf.getString("iothub.name")
val iotHubKeyName : String = conf.getString("iothub.keyName")
val iotHubKey : String = conf.getString("iothub.key")
// Consumer group used to retrieve messages
// @see https://azure.microsoft.com/en-us/documentation/articles/event-hubs-overview
private[this] val tmpCG = conf.getString("iothub.consumerGroup")
val receiverConsumerGroup: String =
tmpCG match {
case "$Default" EventHubClient.DEFAULT_CONSUMER_GROUP_NAME
case "Default" EventHubClient.DEFAULT_CONSUMER_GROUP_NAME
case "default" EventHubClient.DEFAULT_CONSUMER_GROUP_NAME
case "DEFAULT" EventHubClient.DEFAULT_CONSUMER_GROUP_NAME
case _ tmpCG
}
// Message retrieval timeout in milliseconds
private[this] val tmpRTO = conf.getDuration("iothub.receiverTimeout").toMillis
val receiverTimeout: Long =
if (tmpRTO > 0)
tmpRTO
else
DefaultReceiverTimeout
// How many messages to retrieve on each call to the storage
private[this] val tmpRBS = conf.getInt("iothub.receiverBatchSize")
val receiverBatchSize: Int =
if (tmpRBS > 0 && tmpRBS <= MaxBatchSize)
tmpRBS
else
MaxBatchSize
}

Просмотреть файл

@ -0,0 +1,20 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
import com.microsoft.azure.eventhubs.EventHubClient
import com.microsoft.azure.servicebus.ConnectionStringBuilder
private object IoTHubStorage {
private[this] val connString = new ConnectionStringBuilder(
Configuration.iotHubNamespace,
Configuration.iotHubName,
Configuration.iotHubKeyName,
Configuration.iotHubKey).toString
// @todo Manage transient errors e.g. timeouts
// EventHubClient.createFromConnectionString(connString)
// .get(Configuration.receiverTimeout, TimeUnit.MILLISECONDS)
def createClient(): EventHubClient = EventHubClient.createFromConnectionStringSync(connString)
}

Просмотреть файл

@ -0,0 +1,56 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
import java.time.Instant
import com.microsoft.azure.eventhubs.EventData
/* IoTMessage factory */
private object IoTMessage {
/** Create a user friendly representation of the IoT message from the raw
* data coming from the storage
*
* @param rawData Raw data retrieved from the IoT hub storage
* @param partition Storage partition where the message was retrieved from
*
* @return
*/
def apply(rawData: EventData, partition: Option[Int]): IoTMessage = {
new IoTMessage(rawData, partition)
}
}
/** Expose the IoT message body and timestamp
*
* @param partition Storage partition where the message was retrieved from
*/
class IoTMessage(data: EventData, val partition: Option[Int]) {
// Internal properties set by IoT stoage
private[this] lazy val systemProps = data.getSystemProperties()
/** Time when the message was received by IoT hub service
* Note that this might differ from the time set by the device, e.g. in case
* of batch uploads
*/
lazy val created: Instant = systemProps.getEnqueuedTime
/** IoT message offset
* Useful for example to store the current position in the stream
*/
lazy val offset: String = systemProps.getOffset
// IoT message sequence number
lazy val sequenceNumber: Long = systemProps.getSequenceNumber
// ID of the device who sent the message
lazy val deviceId: String = systemProps.get("iothub-connection-device-id").toString
// IoT message content bytes
lazy val content: Array[Byte] = data.getBody
// IoT message content as string, e.g. JSON/XML/etc.
lazy val contentAsString: String = new String(content)
}

Просмотреть файл

@ -0,0 +1,134 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
import java.time.Instant
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import com.microsoft.azure.eventhubs.{EventData, PartitionReceiver}
import scala.collection.convert.decorateAsScala._
private object IoTMessageSource {
/** Create an instance of the messages source for the specified partition
*
* @param partition IoT hub partition to read
* @param offset Starting position, offset of the first message
*
* @return A source returning the body of the message sent from a device.
* Deserialization is left to the consumer.
*/
def apply(partition: Int, offset: String): Source[IoTMessage, NotUsed] = {
Source.fromGraph(new IoTMessageSource(partition, offset))
}
/** Create an instance of the messages source for the specified partition
*
* @param partition IoT hub partition to read
* @param startTime Starting position expressed in time
*
* @return A source returning the body of the message sent from a device.
* Deserialization is left to the consumer.
*/
def apply(partition: Int, startTime: Instant): Source[IoTMessage, NotUsed] = {
Source.fromGraph(new IoTMessageSource(partition, startTime))
}
}
/** Source of messages from one partition of the IoT hub storage
*
* @param partition Partition number (0 to N-1) to read from
* @param offset Starting position
*
* @todo Refactor and use async methods, compare performance
* @todo Consider option to deserialize on the fly to [T], assuming JSON format
*/
private class IoTMessageSource(val partition: Int, val offset: String)
extends GraphStage[SourceShape[IoTMessage]]
with Logger {
abstract class OffsetType
case class SequenceOffset() extends OffsetType
case class TimeOffset() extends OffsetType
def this(partition: Int, startTime: Instant) {
this(partition, "*not used*")
offsetType = TimeOffset()
_startTime = startTime
}
// When retrieving messages, include the message with the initial offset
private[this] val OFFSET_INCLUSIVE = true
// Time of offset used when defining the start of the stream
private[this] var offsetType: OffsetType = SequenceOffset()
private[this] var _startTime: Instant = Instant.MIN
// Define the (sole) output port of this stage
private[this] val out: Outlet[IoTMessage] = Outlet("IoTMessageSource")
// Define the shape of this stage => SourceShape with the port defined above
override val shape: SourceShape[IoTMessage] = SourceShape(out)
// All state MUST be inside the GraphStageLogic, never inside the enclosing
// GraphStage. This state is safe to read/write from all the callbacks
// provided by GraphStageLogic and the registered handlers.
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
// Connect to the IoT hub storage
lazy val receiver: PartitionReceiver = offsetType match {
case SequenceOffset() {
log.info(s"Connecting to partition ${partition.toString} starting from ${offset}")
IoTHubStorage
.createClient()
.createReceiverSync(
Configuration.receiverConsumerGroup,
partition.toString,
offset,
OFFSET_INCLUSIVE)
}
case TimeOffset() {
log.info(s"Connecting to partition ${partition.toString} starting from ${_startTime}")
IoTHubStorage
.createClient()
.createReceiverSync(
Configuration.receiverConsumerGroup,
partition.toString,
_startTime)
}
}
val emptyResult = new Array[IoTMessage](0).toList
// @todo Consider pausing on empty partitions
setHandler(out, new OutHandler {
override def onPull(): Unit = {
try {
val messages: java.lang.Iterable[EventData] =
receiver.receiveSync(Configuration.receiverBatchSize)
if (messages == null) {
log.debug(s"Partition ${partition} is empty")
emitMultiple(out, emptyResult)
} else {
val iterator: scala.collection.immutable.Iterable[IoTMessage] =
messages.asScala.map(e IoTMessage(e, Some(partition))).toList
emitMultiple(out, iterator)
}
} catch {
case e: Exception {
log.error(e, "Fatal error: " + e.getMessage)
}
}
}
})
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
import akka.actor.ActorSystem
import akka.event.{LogSource, Logging}
/** Common logger via Akka
*
* @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
*/
private trait Logger {
implicit val logSource = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
}
val log = Logging(ActorSystem("IoTHubReact"), this)
}

Просмотреть файл

@ -0,0 +1,92 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.javadsl
import java.time.Instant
import akka.NotUsed
import akka.stream.javadsl.{Source SourceJavaDSL}
import com.microsoft.azure.eventhubs.PartitionReceiver
import com.microsoft.azure.iot.iothubreact.IoTMessage
import com.microsoft.azure.iot.iothubreact.scaladsl.{IoTHub IoTHubScalaDSL}
/** Provides a streaming source to retrieve messages from Azure IoT Hub
*
* @todo Support reading the same partition from multiple clients
*/
class IoTHub() extends {
// Offset used to start reading from the beginning
val OffsetStartOfStream: String = PartitionReceiver.START_OF_STREAM
private lazy val iotHub = new IoTHubScalaDSL()
/** Stream returning all the messages since the beginning, from the specified
* partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
*
* @return A source of IoT messages
*/
def source(partition: Int): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(partition))
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
* @param offset Starting position, offset of the first message
*
* @return A source of IoT messages
*/
def source(partition: Int, offset: String): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(partition, offset))
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
* @param startTime Starting position expressed in time
*
* @return A source of IoT messages
*/
def source(partition: Int, startTime: Instant): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(partition, startTime))
}
/** Stream returning all the messages since the beginning, from all the
* configured partitions.
*
* @return A source of IoT messages
*/
def source(): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source())
}
/** Stream returning all the messages starting from the given offset, from all
* the configured partitions.
*
* @param offset Starting position
*
* @return A source of IoT messages
*/
def source(offset: String): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(offset))
}
/** Stream returning all the messages starting from the given time, from all
* the configured partitions.
*
* @param startTime Starting position expressed in time
*
* @return A source of IoT messages
*/
def source(startTime: Instant): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(startTime))
}
}

Просмотреть файл

@ -0,0 +1,119 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.scaladsl
import java.time.Instant
import akka.NotUsed
import akka.stream.SourceShape
import akka.stream.scaladsl._
import com.microsoft.azure.eventhubs.PartitionReceiver
import com.microsoft.azure.iot.iothubreact._
/** Provides a streaming source to retrieve messages from Azure IoT Hub
*
* @todo Support reading the same partition from multiple clients
*/
class IoTHub {
// Offset used to start reading from the beginning
val OffsetStartOfStream: String = PartitionReceiver.START_OF_STREAM
/** Stream returning all the messages since the beginning, from the specified
* partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
*
* @return A source of IoT messages
*/
def source(partition: Int): Source[IoTMessage, NotUsed] = {
IoTMessageSource(partition, OffsetStartOfStream)
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
* @param offset Starting position, offset of the first message
*
* @return A source of IoT messages
*/
def source(partition: Int, offset: String): Source[IoTMessage, NotUsed] = {
IoTMessageSource(partition, offset)
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
* @param startTime Starting position expressed in time
*
* @return A source of IoT messages
*/
def source(partition: Int, startTime: Instant): Source[IoTMessage, NotUsed] = {
IoTMessageSource(partition, startTime)
}
/** Stream returning all the messages since the beginning, from all the
* configured partitions.
*
* @return A source of IoT messages
*/
def source(): Source[IoTMessage, NotUsed] = source(OffsetStartOfStream)
/** Stream returning all the messages starting from the given offset, from all
* the configured partitions.
*
* @param offset Starting position
*
* @return A source of IoT messages
*/
def source(offset: String): Source[IoTMessage, NotUsed] = {
source(offset, Instant.MIN, false)
}
/** Stream returning all the messages starting from the given time, from all
* the configured partitions.
*
* @param startTime Starting position expressed in time
*
* @return A source of IoT messages
*/
def source(startTime: Instant): Source[IoTMessage, NotUsed] = {
source("", startTime, true)
}
/** Stream returning all the messages, from the given starting point
*
* @param offset Starting position using the offset property in the messages
* @param startTime Starting position expressed in time
* @param timeOffset Whether the start point is a timestamp
*
* @return A source of IoT messages
*/
private[this] def source(
offset: String,
startTime: Instant,
timeOffset: Boolean): Source[IoTMessage, NotUsed] = {
val graph = GraphDSL.create() {
implicit b
import GraphDSL.Implicits._
val merge = b.add(Merge[IoTMessage](Configuration.iotHubPartitions))
for (p 0 until Configuration.iotHubPartitions) {
val graph = if (timeOffset) IoTMessageSource(p, startTime) else IoTMessageSource(p, offset)
val source = Source.fromGraph(graph).async
source ~> merge
}
SourceShape(merge.out)
}
Source.fromGraph(graph)
}
}

Просмотреть файл

@ -0,0 +1,15 @@
iothub {
partitions = ${?IOTHUB_CI_PARTITIONS}
name = ${?IOTHUB_CI_NAME}
namespace = ${?IOTHUB_CI_NAMESPACE}
keyName = ${?IOTHUB_CI_ACCESS_KEY0_NAME}
key = ${?IOTHUB_CI_ACCESS_KEY0_VALUE}
devices = ${?IOTHUB_CI_DEVICES_JSON_FILE}
receiverBatchSize = 3
receiverTimeout = 3s
}
akka {
loglevel = "INFO"
}

Просмотреть файл

@ -0,0 +1,215 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.test
import java.time.Instant
import akka.NotUsed
import akka.actor.Props
import akka.pattern.ask
import akka.stream.scaladsl.{Sink, Source}
import com.microsoft.azure.iot.iothubreact.IoTMessage
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHub
import com.microsoft.azure.iot.iothubreact.test.helpers.{Counter, Device, Logger, ReactiveStreaming}
import org.scalatest._
import resource._
import scala.collection.parallel.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
/** Tests streaming against Azure IoT hub endpoint
*
* Note: the tests require an actual hub ready to use
*/
class IoTHubReactiveStreamingUserStory
extends FeatureSpec
with GivenWhenThen
with ReactiveStreaming
with Logger {
info("As a client of Azure IoT hub")
info("I want to be able to receive all the messages as a stream")
info("So I can process them asynchronously and at scale")
val counter = actorSystem.actorOf(Props[Counter], "Counter")
counter ! "reset"
def readCounter: Long = {
Await.result(counter.ask("get")(5 seconds), 5 seconds).asInstanceOf[Long]
}
feature("All IoT messages are presented as an ordered stream") {
scenario("Developer wants to retrieve IoT messages") {
Given("An IoT hub is configured")
val hub = new IoTHub()
When("A developer wants to fetch messages from Azure IoT hub")
val messagesFromOnePartition: Source[IoTMessage, NotUsed] = hub.source(1)
val messagesFromAllPartitions: Source[IoTMessage, NotUsed] = hub.source
val messagesFromNowOn: Source[IoTMessage, NotUsed] = hub.source(Instant.now())
Then("The messages are presented as a stream")
messagesFromOnePartition.to(Sink.ignore)
messagesFromAllPartitions.to(Sink.ignore)
messagesFromNowOn.to(Sink.ignore)
}
scenario("Application wants to retrieve all IoT messages") {
// How many seconds we allow the test to wait for messages from the stream
val TestTimeout = 60 seconds
// A label shared by all the messages, to filter out data sent by other tests
val testRunId: String = "[RetrieveAll-" + java.util.UUID.randomUUID().toString + "]"
// We'll use this as the streaming start date
val startTime = Instant.now()
log.info(s"Test run: ${testRunId}, Start time: ${startTime}")
Given("An IoT hub is configured")
val messages = new IoTHub().source(startTime)
And("5 devices have sent 4 messages each")
var expectedMessageCount: Long = 0
for {
device1 managed(new Device("device10000"))
device2 managed(new Device("device10001"))
device3 managed(new Device("device10002"))
device4 managed(new Device("device10003"))
device5 managed(new Device("device10004"))
} {
for (i 1 to 4) {
device1.sendMessage(testRunId, i)
device2.sendMessage(testRunId, i)
device3.sendMessage(testRunId, i)
device4.sendMessage(testRunId, i)
device5.sendMessage(testRunId, i)
expectedMessageCount += 5
}
log.info(s"Messages sent: $expectedMessageCount")
}
When("A client application processes messages from the stream")
counter ! "reset"
val count = Sink.foreach[IoTMessage] {
m counter ! "inc"
}
messages
.filter(m m.contentAsString contains testRunId)
.to(count)
.run()
Then("Then the client application receives all the messages sent")
var time = TestTimeout.toMillis.toInt
val pause = time / 10
var actualMessageCount = readCounter
while (time > 0 && actualMessageCount < expectedMessageCount) {
Thread.sleep(pause)
time -= pause
actualMessageCount = readCounter
log.info(s"Messages received so far: ${actualMessageCount} [Time left ${time / 1000} secs]")
}
assert(
actualMessageCount == expectedMessageCount,
s"Expecting ${expectedMessageCount} messages but received ${actualMessageCount}")
}
scenario("Customer needs to process IoT messages in the right order") {
// How many seconds we allow the test to wait for messages from the stream
val TestTimeout = 120 seconds
val MessagesPerDevice = 200
// A label shared by all the messages, to filter out data sent by other tests
val testRunId: String = "[VerifyOrder-" + java.util.UUID.randomUUID().toString + "]"
// We'll use this as the streaming start date
val startTime = Instant.now()
log.info(s"Test run: ${testRunId}, Start time: ${startTime}")
Given("An IoT hub is configured")
val messages = new IoTHub().source(startTime)
And(s"10 devices have sent ${MessagesPerDevice} messages each")
var expectedMessageCount: Int = 0
for {
device1 managed(new Device("device10000"))
device2 managed(new Device("device10001"))
device3 managed(new Device("device10002"))
device4 managed(new Device("device10003"))
device5 managed(new Device("device10004"))
device6 managed(new Device("device10005"))
device7 managed(new Device("device10006"))
device8 managed(new Device("device10007"))
device9 managed(new Device("device10008"))
device10 managed(new Device("device10009"))
} {
for (i 1 to MessagesPerDevice) {
device1.sendMessage(testRunId, i)
device2.sendMessage(testRunId, i)
device3.sendMessage(testRunId, i)
device4.sendMessage(testRunId, i)
device5.sendMessage(testRunId, i)
device6.sendMessage(testRunId, i)
device7.sendMessage(testRunId, i)
device8.sendMessage(testRunId, i)
device9.sendMessage(testRunId, i)
device10.sendMessage(testRunId, i)
expectedMessageCount += 10
}
log.info(s"Messages sent: $expectedMessageCount")
}
When("A client application processes messages from the stream")
Then("Then the client receives all the messages ordered within each device")
counter ! "reset"
val cursors = new mutable.ParHashMap[String, Long]
val verifier = Sink.foreach[IoTMessage] {
m {
counter ! "inc"
log.debug(s"device: ${m.deviceId}, seq: ${m.sequenceNumber} ")
if (!cursors.contains(m.deviceId)) {
cursors.put(m.deviceId, m.sequenceNumber)
}
if (cursors(m.deviceId) > m.sequenceNumber) {
fail(s"Message out of order. " +
s"Device ${m.deviceId}, message ${m.sequenceNumber} arrived " +
s"after message ${cursors(m.deviceId)}")
}
cursors.put(m.deviceId, m.sequenceNumber)
}
}
messages
.filter(m m.contentAsString contains (testRunId))
.to(verifier)
.run()
// Wait till all messages have been verified
var time = TestTimeout.toMillis.toInt
val pause = time / 10
var actualMessageCount = readCounter
while (time > 0 && actualMessageCount < expectedMessageCount) {
Thread.sleep(pause)
time -= pause
actualMessageCount = readCounter
log.info(s"Messages received so far: ${actualMessageCount} [Time left ${time / 1000} secs]")
}
assert(
actualMessageCount == expectedMessageCount,
s"Expecting ${expectedMessageCount} messages but received ${actualMessageCount}")
}
}
}

Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.test.helpers
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.microsoft.azure.eventhubs.EventHubClient
import com.typesafe.config.{Config, ConfigFactory}
import scala.reflect.io.File
/* Test configuration settings */
private object Configuration {
private[this] val conf: Config = ConfigFactory.load()
// Read-only settings
val iotHubPartitions: Int = conf.getInt("iothub.partitions")
val iotHubNamespace : String = conf.getString("iothub.namespace")
val iotHubName : String = conf.getString("iothub.name")
val iotHubKeyName : String = conf.getString("iothub.keyName")
val iotHubKey : String = conf.getString("iothub.key")
// Tests can override these
var iotReceiverConsumerGroup: String = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME
var receiverTimeout : Long = conf.getDuration("iothub.receiverTimeout").toMillis
var receiverBatchSize : Int = conf.getInt("iothub.receiverBatchSize")
// Read devices configuration from JSON file
private[this] val jsonParser = new ObjectMapper()
jsonParser.registerModule(DefaultScalaModule)
private[this] lazy val devicesJson = File(conf.getString("iothub.devices")).slurp()
private[this] lazy val devices = jsonParser.readValue(devicesJson, classOf[Array[DeviceCredentials]])
def deviceCredentials(id: String): DeviceCredentials = devices.find(x x.deviceId == id).get
}

Просмотреть файл

@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.test.helpers
import java.util.concurrent.Executors
import akka.actor.Actor
import scala.concurrent.ExecutionContext
/* Thread safe counter */
class Counter extends Actor {
implicit val executionContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(sys.runtime.availableProcessors))
private[this] var count: Long = 0
override def receive: Receive = {
case "reset" count = 0
case "inc" count += 1
case "get" sender() ! count
}
}

Просмотреть файл

@ -0,0 +1,55 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.test.helpers
import com.microsoft.azure.iothub._
import resource.Resource
/* Companion object required by `Resource` for type inference */
object Device {
implicit object TypeClassResourceHelper extends Resource[Device] {
override def close(r: Device): Unit = r.disconnect()
}
}
/* Test helper to send messages to the hub */
class Device(deviceId: String) extends Resource[Device] with Logger {
private[this] class EventCallback extends IotHubEventCallback {
override def execute(status: IotHubStatusCode, context: scala.Any): Unit = {
val i = context.asInstanceOf[Int]
log.debug(s"Message ${i} status ${status.name()}")
}
}
// Load device credentials
private[this] lazy val credentials = Configuration.deviceCredentials(deviceId)
// Prepare connection string for this device
private[this] lazy val connString = DeviceConnectionString.build(
Configuration.iotHubName, credentials.deviceId, credentials.primaryKey)
// Prepare client to send messages
private[this] lazy val client = new DeviceClient(connString, IotHubClientProtocol.AMQPS)
/** Note: automatically invoked by Resource[A]
*
* @param resource Resource being disposed
*/
override def close(resource: Device): Unit = {
resource.disconnect()
}
def disconnect(): Unit = {
client.close()
}
def sendMessage(text: String, sequenceNumber: Int): Unit = {
client.open()
log.debug(s"Device '$deviceId' sending '$text'")
val message = new Message(text)
client.sendEventAsync(message, new EventCallback(), sequenceNumber)
}
}

Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.test.helpers
/* Format a connection string accordingly to SDK */
object DeviceConnectionString {
/** Format a connection string accordingly to SDK
*
* @param hubName IoT hub name
* @param deviceId Device ID
* @param accessKey Device authorization key
*
* @return A connection string
*/
def build(hubName: String, deviceId: String, accessKey: String): String = {
s"HostName=$hubName.azure-devices.net;DeviceId=$deviceId;SharedAccessKey=$accessKey"
}
}

Просмотреть файл

@ -0,0 +1,10 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.test.helpers
/** Model used to deserialize the device credentials
*
* @param deviceId Device ID
* @param primaryKey Device authoriazion key
*/
class DeviceCredentials(val deviceId: String, val primaryKey: String) {}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.test.helpers
import akka.actor.ActorSystem
import akka.event.{LogSource, Logging}
/** Common logger via Akka
*
* @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
*/
trait Logger {
implicit val logSource = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
}
val log = Logging(ActorSystem("IoTHubReactTests"), this)
}

Просмотреть файл

@ -0,0 +1,22 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.test.helpers
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
/** Initialize reactive streaming
*
* @todo Don't use supervisor with Akka streams
*/
trait ReactiveStreaming {
val decider: Supervision.Decider = {
case e: Exception
println(e.getMessage)
Supervision.Resume
}
implicit val actorSystem = ActorSystem("Tests")
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem)
.withSupervisionStrategy(decider))
}