* Add checkpointing feature, saving the current position of the streams, and automatically restart from the previous psition. The position can be saved either in Azure blobs or Cassandra.
* Support filtering the stream by message type (aka "model"), so that a device can send different kind of telemetry.
* Keep alive streams in absence of traffic
* Add device simulator app to simulate temperature and humidity sensors
* Add support for Scala 2.12.0-RC1
* Overall refactoring to reduce the amount of code on the client side
* Retry logic on storage operations
* Add Travis CI config
* Improve documentation
* Remove dependency on scala-arm
* Start move to json4s
* Change code style to 100 columns
* Add script to run the sample demos
* API changes, split IoTHub in two classes
This commit is contained in:
Devis Lucato 2016-10-26 16:13:10 -07:00 коммит произвёл GitHub
Родитель f3996eb816
Коммит 0b393e643e
68 изменённых файлов: 2754 добавлений и 529 удалений

5
.gitignore поставляемый
Просмотреть файл

@ -1,5 +1,7 @@
### Project specific ###
.idea
tools/devices-simulator/credentials.js
*.crt
### MacOS ###
.DS_Store
@ -27,6 +29,9 @@ project/plugins/project/
.ensime_cache/
.ensime
### Node.js ###
node_modules/
### Intellij ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839

16
.travis.yml Normal file
Просмотреть файл

@ -0,0 +1,16 @@
jdk: oraclejdk8
language: scala
scala:
- 2.11.8
- 2.12.0-RC1
cache:
directories:
- "$HOME/.ivy2"
- "$HOME/.sbt"
- "$HOME/.m2"
notifications:
slack:
secure: S6pcmclrj9vaqHOFMrjgYkF6wXrYF6nB5joYY0rqAwsmTLf7crXRVKZ8txlatpxMHc20Rbw8RQDM6tTka9wwBkHZZfErrcPsS84d5MU9siEkIY42/bAQwuYhxkcgilttgFmSwzLodE72giC/VMhIYCSOyOXIxuR0VtBiPD9Inm9QZ35dZDx3P3nbnaOC4fk+BjdbrX1LB8YL9z5Gy/9TqI90w0FV85XMef75EnSgpqeMD/GMB5hIg+arWVnC2S6hZ91PPCcxCTKBYDjwqUac8mFW/sMFT/yrb2c0NE6ZQqa3dlx/XFyC1X6+7DjJli2Y8OU+FPjY1tQC8JxgVFTbddIgCdUM/5be4uHN/KNs/yF7w1g06ZXK4jhJxxpL4zWINlqDrDmLaqhAtPQkc2CqL3g8MCwYxBbxZY4aFyPfZD7YLdQXDzJZNcfXn9RQQh5y+/zexbGc1zZ/XUo5bK3VbElSs+o2ErI+Sze0FaiK8fW+QeitBdGvjMY7YVKi0Zzf5Dxx1wwxiHR1PQ1r0hA8YZQxwwdpa5lWLFlSVu2w+upPtXqfINMeFktQPbOs1JWIvUvLV0A38dS6R/DsM/W1a3OEVbHQ0Z6OV1nffDnGYPLUl5kRDPFuYYugmCpQHW73lqJdiM0O+Ote4eOQniL1rcajtt+V5cn1/JRWzdJ4PH0=
before_install:
- openssl aes-256-cbc -K $encrypted_13c010a56f48_key -iv $encrypted_13c010a56f48_iv
-in devices.json.enc -out src/test/resources/devices.json -d

88
CHECKPOINTING.md Normal file
Просмотреть файл

@ -0,0 +1,88 @@
# Stream position checkpoint
The library provides a mechanism to restart the stream from a recent *checkpoint*, to be resilient
to restarts and crashes.
*Checkpoints* are saved automatically, with a configured frequency, on a storage provided.
For instance, the stream position can be saved every 15 seconds in Azure blobs, or
(soon) a custom backend.
To store checkpoints in Azure blobs the configuration looks like this:
```
iothub-checkpointing {
enabled = true
frequency = 15s
countThreshold = 1000
timeThreshold = 30s
storage {
rwTimeout = 5s
backendType = "AzureBlob"
namespace = "iothub-react-checkpoints"
azureblob {
lease = 15s
useEmulator = false
protocol = "https"
account = "..."
key = "..."
}
}
}
```
Soon it will be possible to plug in custom storage backends implementing a simple
[interface](src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/CheckpointBackend.scala)
to read and write the stream position.
There is also one API parameter to enabled/disable the mechanism, for example:
```scala
val start = java.time.Instant.now()
val withCheckpoints = false
IoTHub.source(start, withCheckpoints)
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
.filter(_.value > 100)
.to(console)
.run()
```
# Checkpointing behavior
### Configuration
The following table describes the impact of the settings within the `iothub-checkpointing`
configuration block. For further information, you can also check the
[reference.conf](src/main/resources/reference.conf) file.
| Setting | Type | Example | Description |
|---------|------|---------|-------------|
| **enabled** | bool | true | Global switch to enable/disable the checkpointing feature. This value overrides the API parameter "withCheckpoints". |
| **frequency** | duration | 15s | How often to check if the offset in memory should be saved to storage. The check is scheduled for each partition individually. |
| **countThreshold** | int | 1000 | How many messages to stream before saving the position. The setting is applied to each partition individually. The value should be big enough to take into account buffering and batching. |
| **timeThreshold** | duration | 60s | In case of low traffic (i.e. when not reaching countThreshold), save the stream position that is older than this value.|
| storage.**rwTimeout** | duration | 5000ms | How long to wait, when writing to the storage, before triggering a storage timeout exception. |
| storage.**backendType** | string or class name | "AzureBlob" | Currently only "AzureBlob" is supported. The name of the backend, or the class FQDN, to use to write to the storage. This provides a way to inject custom storage logic. |
| storage.**namespace** | string | "mycptable" | The table/container which will contain the checkpoints data. When streaming data from multiple IoT hubs, you can use this setting, to store each stream position to a common storage, but in separate tables/containers. |
### Runtime
The following table describes the system behavior, based on **API parameters** and stored **state**.
| Checkpointing | Start point | Saved position | Behavior |
|:---:|:---:|:-------:|---|
| No | No | No | The stream starts from the beginning
| No | No | **Yes** | The stream starts from the beginning (**the saved position is ignored**)
| No | Yes | No | The stream starts from the 'start point' provided
| No | Yes | **Yes** | The stream starts from the 'start point' provided (**the saved position is ignored**)
| Yes | No | No | The stream starts from the beginning
| Yes | No | **Yes** | **The stream starts from the saved position**
| Yes | Yes | No | The stream starts from the 'start point' provided
| Yes | Yes | **Yes** | **The stream starts from the saved position**
Legend:
* **Checkpointing**: whether checkpointing (saving the stream position) is enabled or disabled
* **Start point**: whether the client provides a starting position (date or offset) or ask for all
the events from the beginning
* **Saved position**: whether there is a position saved in the storage

Просмотреть файл

@ -1,22 +1,22 @@
<code_scheme name="Toketi">
<option name="RIGHT_MARGIN" value="80" />
<option name="JD_ADD_BLANK_AFTER_RETURN" value="true" />
<ScalaCodeStyleSettings>
<option name="PLACE_CLOSURE_PARAMETERS_ON_NEW_LINE" value="true" />
<option name="ALIGN_IN_COLUMNS_CASE_BRANCH" value="true" />
<option name="USE_ALTERNATE_CONTINUATION_INDENT_FOR_PARAMS" value="true" />
<option name="SD_BLANK_LINE_AFTER_PARAMETERS_COMMENTS" value="true" />
<option name="SD_BLANK_LINE_AFTER_RETURN_COMMENTS" value="true" />
<option name="SD_BLANK_LINE_BEFORE_PARAMETERS" value="true" />
<option name="REPLACE_CASE_ARROW_WITH_UNICODE_CHAR" value="true" />
<option name="REPLACE_MAP_ARROW_WITH_UNICODE_CHAR" value="true" />
<option name="REPLACE_FOR_GENERATOR_ARROW_WITH_UNICODE_CHAR" value="true" />
</ScalaCodeStyleSettings>
<codeStyleSettings language="JAVA">
<option name="KEEP_FIRST_COLUMN_COMMENT" value="false" />
</codeStyleSettings>
<codeStyleSettings language="Scala">
<option name="ALIGN_MULTILINE_PARAMETERS" value="false" />
<option name="ALIGN_GROUP_FIELD_DECLARATIONS" value="true" />
</codeStyleSettings>
</code_scheme>
<option name="RIGHT_MARGIN" value="100"/>
<option name="JD_ADD_BLANK_AFTER_RETURN" value="true"/>
<ScalaCodeStyleSettings>
<option name="PLACE_CLOSURE_PARAMETERS_ON_NEW_LINE" value="true"/>
<option name="ALIGN_IN_COLUMNS_CASE_BRANCH" value="true"/>
<option name="USE_ALTERNATE_CONTINUATION_INDENT_FOR_PARAMS" value="true"/>
<option name="SD_BLANK_LINE_AFTER_PARAMETERS_COMMENTS" value="true"/>
<option name="SD_BLANK_LINE_AFTER_RETURN_COMMENTS" value="true"/>
<option name="SD_BLANK_LINE_BEFORE_PARAMETERS" value="true"/>
<option name="REPLACE_CASE_ARROW_WITH_UNICODE_CHAR" value="true"/>
<option name="REPLACE_MAP_ARROW_WITH_UNICODE_CHAR" value="true"/>
<option name="REPLACE_FOR_GENERATOR_ARROW_WITH_UNICODE_CHAR" value="true"/>
</ScalaCodeStyleSettings>
<codeStyleSettings language="JAVA">
<option name="KEEP_FIRST_COLUMN_COMMENT" value="false"/>
</codeStyleSettings>
<codeStyleSettings language="Scala">
<option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
<option name="ALIGN_GROUP_FIELD_DECLARATIONS" value="true"/>
</codeStyleSettings>
</code_scheme>

213
README.md
Просмотреть файл

@ -1,28 +1,45 @@
[![Maven Central][maven-badge]][maven-url]
[![Bintray][bintray-badge]][bintray-url]
[![Build][build-badge]][build-url]
[![Issues][issues-badge]][issues-url]
[![Gitter][gitter-badge]][gitter-url]
# IoTHubReact
IoTHub React is an Akka Stream library that can be used to read data from
[Azure IoT Hub](https://azure.microsoft.com/en-us/services/iot-hub/), via a
reactive stream with asynchronous back pressure. Azure IoT Hub is a service
used to connect thousands to millions of devices to the Azure cloud.
[Azure IoT Hub](https://azure.microsoft.com/en-us/services/iot-hub/), via a reactive stream with
asynchronous back pressure. Azure IoT Hub is a service used to connect thousands to millions of
devices to the Azure cloud.
A simple example on how to use the library in Scala is the following:
A simple example on how to use the library is the following:
```scala
new IoTHub().source()
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
IoTHub().source()
.map(m => parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
```
A stream of incoming telemetry data is read, parsed and converted to a
`Temperature` object and filtered based on the temperature value.
```
and the equivalent code in Java:
```java
TypeReference<Temperature> type = new TypeReference<Temperature>() {};
new IoTHub().source()
.map(m -> (Temperature) jsonParser.readValue(m.contentAsString(), type))
.filter(x -> x.value > 100)
.to(console())
.run(streamMaterializer);
```
A stream of incoming telemetry data is read, parsed and converted to a `Temperature` object and
filtered based on the temperature value.
#### Streaming from IoT hub to _any_
A more interesting example is reading telemetry data from Azure IoTHub, and sending it to a Kafka
topic, so it can be consumed by other services downstream:
A more interesting example is reading telemetry data from Azure IoTHub, and
sending it to a Kafka topic, so it can be consumed by other services downstream:
```scala
...
import org.apache.kafka.common.serialization.StringSerializer
@ -47,32 +64,102 @@ case class KafkaProducer(bootstrapServer: String)(implicit val system: ActorSyst
```scala
val kafkaProducer = KafkaProducer(bootstrapServer)
IoTHub.source()
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
IoTHub().source()
.map(m => parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.runWith(kafkaProducer.getSink())
```
```
## Source options
### IoT hub partitions
The library supports also reading from a single
[IoTHub partition](https://azure.microsoft.com/en-us/documentation/articles/event-hubs-overview),
so a service that consumes IoTHub events can create multiple streams and
process them independently.
so a service that consumes IoTHub events can create multiple streams and process them independently.
```scala
val partitionNumber = 1
IoTHub.source(partitionNumber)
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
.map(m => parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
```
```
### Starting point
Unless specified, the stream starts from the beginning of the data present in each partition.
It's possible to start the stream from a given date and time too:
```scala
val start = java.time.Instant.now()
IoTHub.source(start)
.map(m => parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
```
### Stream processing restart, saving the current position
The library provides a mechanism to restart the stream from a recent *checkpoint*, to be resilient
to restarts and crashes.
*Checkpoints* are saved automatically, with a configured frequency, on a storage provided.
For instance, the stream position can be saved every 15 seconds, in a table in Cassandra, on Azure
blobs, or a custom backend.
To store checkpoints in Azure blobs the configuration looks like this:
```
iothub-checkpointing {
enabled = true
frequency = 15s
countThreshold = 1000
timeThreshold = 30s
storage {
rwTimeout = 5s
backendType = "AzureBlob"
namespace = "iothub-react-checkpoints"
azureblob {
lease = 15s
useEmulator = false
protocol = "https"
account = "..."
key = "..."
}
}
}
```
There are some [configuration parameters](src/main/resources/reference.conf) to manage the
checkpoint behavior, and soon it will also be possible to plug-in custom storage backends,
implementing a simple
[interface](src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/CheckpointBackend.scala)
to read and write the stream position.
There is also one API parameter to enabled/disable the mechanism, for example:
```scala
val start = java.time.Instant.now()
val withCheckpoints = false
IoTHub.source(start, withCheckpoints)
.map(m => parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
```
## Build configuration
IoTHubReact is available on Maven Central, you just need to add the following
reference in your `build.sbt` file:
IoTHubReact is available on Maven Central, you just need to add the following reference in
your `build.sbt` file:
```scala
libraryDependencies ++= {
val iothubReactV = "0.6.0"
val iothubReactV = "0.7.0"
Seq(
"com.microsoft.azure.iot" %% "iothub-react" % iothubReactV
@ -86,24 +173,23 @@ or this dependency in `pom.xml` file if working with Maven:
<dependency>
<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react_2.11</artifactId>
<version>0.6.0</version>
<version>0.7.0</version>
</dependency>
```
### IoTHub configuration
IoTHubReact uses a configuration file to fetch the parameters required to
connect to Azure IoTHub.
IoTHubReact uses a configuration file to fetch the parameters required to connect to Azure IoTHub.
The exact values to use can be found in the [Azure Portal](https://portal.azure.com):
* **namespace**: it is the first part of the _Event Hub-compatible endpoint_,
which usually has this format: `sb://<namespace>.servicebus.windows.net/`
* **namespace**: it is the first part of the _Event Hub-compatible endpoint_, which usually has
this format: `sb://<namespace>.servicebus.windows.net/`
* **name**: see _Event Hub-compatible name_
* **keyname**: usually the value is `service`
* **key**: the Primary Key can be found under
_shared access policies/service_ policy (it's a base64 encoded string)
* **key**: the Primary Key can be found under _shared access policies/service_ policy (it's a
base64 encoded string)
The values should be stored in your `application.conf` resource (or equivalent),
which can reference environment settings if you prefer.
The values should be stored in your `application.conf` resource (or equivalent), which can
reference environment settings if you prefer.
```
iothub {
@ -132,47 +218,66 @@ There are other settings, to tune performance and connection details:
* **consumerGroup**: the
[consumer group](https://azure.microsoft.com/en-us/documentation/articles/event-hubs-overview)
used during the connection
* **receiverBatchSize**: the number of messages retrieved on each call to
Azure IoT hub. The default (an maximum) value is 999.
* **receiverTimeout**: timeout applied to calls while retrieving messages. The
default value is 3 seconds.
* **receiverBatchSize**: the number of messages retrieved on each call to Azure IoT hub. The
default (and maximum) value is 999.
* **receiverTimeout**: timeout applied to calls while retrieving messages. The default value is
3 seconds.
The complete configuration reference is available in
[reference.conf](src/main/resources/reference.conf).
## Samples
In order to run the samples you need to have either the IoTHub settings defined
via environment variables, or in the config file. Follow the instructions in
the previous section on how to set the correct values.
The project includes 4 demos, showing some of the use cases and how IoThub React API works.
All the demos require an instance of Azure IoT hub, with some devices, and messages.
1. **DisplayMessages** [Java]: how to stream Azure IoT hub withing a Java application, filtering
temperature values greater than 60C
2. **OutputMessagesToConsole** [Scala]: stream all Temeprature events to console
3. **MessagesThroughput** [Scala]: stream all IoT hub messages, showing the current speed, and
optionally throttling the speed to 200 msg/sec
4. **Checkpoints** [Scala]: demonstrate how the stream can be restarted without losing its position.
The current position is stored in a Cassandra table (we suggest to run a docker container for
the purpose of the demo, e.g. `docker run -ip 9042:9042 --rm cassandra`)
We provide a [device simulator](tools/devices-simulator/README.md) in the tools section,
which will help setting up these requirements.
When ready, you should either edit the `application.conf` configuration files
([scala](samples-scala/src/main/resources/application.conf) and
[java](samples-java/src/main/resources/application.conf))
with your credentials, or set the corresponding global variables.
Follow the instructions in the previous section on how to set the correct values.
* [`samples-scala`](samples-scala/src/main/scala):
You can use `sbt run` to run the demos.
You can use `sbt run` to run the demos (or the `run_samples.*` scripts)
* [`samples-java`](samples-java/src/main/java):
You can use
`mvn clean compile exec:java -Dexec.mainClass="DisplayMessages.Demo"`
to run the demo app.
You can use `mvn clean compile exec:java -Dexec.mainClass="DisplayMessages.Demo"` to run the
demo app (or the `run_samples.*` scripts)
## Future work
* improve asynchronicity by using EventHub SDK async APIs
* add support for checkpointing so the library can be used in scenarios where
the consuming service needs to be resilient to crashes/restarts
* add Sink for Cloud2Device scenarios. `IoTHub.Sink` will allow cloud services
to send messages to devices (via Azure IoTHub)
* add Sink for Cloud2Device scenarios. `IoTHub.Sink` will allow cloud services to send messages
to devices (via Azure IoTHub)
# Contribute Code
If you want/plan to contribute, we ask you to sign a
[CLA](https://cla.microsoft.com/) (Contribution license Agreement). A friendly
bot will remind you about it when you submit a pull-request.
If you are sending a pull request, we kindly request to check the code style
with IntelliJ IDEA, importing the settings from `Codestyle.IntelliJ.xml`.
If you want/plan to contribute, we ask you to sign a [CLA](https://cla.microsoft.com/)
(Contribution license Agreement). A friendly bot will remind you about it when you submit
a pull-request.
If you are sending a pull request, we kindly request to check the code style with IntelliJ IDEA,
importing the settings from `Codestyle.IntelliJ.xml`.
[maven-badge]: https://img.shields.io/maven-central/v/com.microsoft.azure.iot/iothub-react_2.11.svg
[maven-url]: http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22iothub-react_2.11%22
[bintray-badge]: https://img.shields.io/bintray/v/microsoftazuretoketi/toketi-repo/iothub-react.svg?maxAge=2592000
[bintray-badge]: https://img.shields.io/bintray/v/microsoftazuretoketi/toketi-repo/iothub-react.svg
[bintray-url]: https://bintray.com/microsoftazuretoketi/toketi-repo/iothub-react
[build-badge]: https://img.shields.io/travis/Azure/toketi-iothubreact.svg
[build-url]: https://travis-ci.org/Azure/toketi-iothubreact
[issues-badge]: https://img.shields.io/github/issues/azure/toketi-iothubreact.svg?style=flat-square
[issues-url]: https://github.com/azure/toketi-iothubreact/issues
[gitter-badge]: https://img.shields.io/gitter/room/azure/toketi-repo.js.svg?maxAge=2592000
[gitter-badge]: https://img.shields.io/gitter/room/azure/toketi-repo.js.svg
[gitter-url]: https://gitter.im/azure-toketi/iothub-react

Просмотреть файл

@ -2,41 +2,47 @@
name := "iothub-react"
organization := "com.microsoft.azure.iot"
version := "0.6.0"
version := "0.7.0"
scalaVersion := "2.11.8"
// @todo Akka version depends on Scala version (2.10 => 2.3.15, 2.11 => 2.4.9)
//crossScalaVersions := Seq("2.10.6", "2.11.8")
crossScalaVersions := Seq("2.11.8", "2.12.0-RC1")
logLevel := Level.Warn // Debug|Info|Warn|Error
scalacOptions ++= Seq("-deprecation", "-explaintypes", "-optimise",
"-unchecked", "-verbose")
scalacOptions ++= Seq("-deprecation", "-explaintypes", "-unchecked", "-feature")
libraryDependencies ++= {
val akkaStreamVersion = "2.4.9"
val azureEventHubSDKVersion = "0.8.2"
libraryDependencies <++= (scalaVersion) {
scalaVersion ⇒
val azureEventHubSDKVersion = "0.8.2"
val azureStorageSDKVersion = "4.4.0"
val iothubClientVersion = "1.0.14"
val scalaTestVersion = "3.0.0"
val jacksonVersion = "2.8.3"
val akkaStreamVersion = "2.4.11"
val datastaxDriverVersion = "3.0.2"
val json4sVersion = "3.4.1"
Seq(
// Library dependencies
"com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
"com.microsoft.azure" % "azure-eventhubs" % azureEventHubSDKVersion,
Seq(
// Library dependencies
"com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
"com.microsoft.azure" % "azure-eventhubs" % azureEventHubSDKVersion,
"com.microsoft.azure" % "azure-storage" % azureStorageSDKVersion,
"com.datastax.cassandra" % "cassandra-driver-core" % datastaxDriverVersion,
"org.json4s" %% "json4s-native" % json4sVersion,
"org.json4s" %% "json4s-jackson" % json4sVersion,
// Tests dependencies
"org.scalatest" %% "scalatest" % "3.0.0" % "test",
"com.microsoft.azure.iothub-java-client" % "iothub-java-device-client" % "1.0.14" % "test",
"com.jsuereth" %% "scala-arm" % "1.4" % "test",
// Tests dependencies
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.microsoft.azure.iothub-java-client" % "iothub-java-device-client" % iothubClientVersion % "test",
// Remove < % "test" > to run samples-scala against the local workspace
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.3" % "test",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.8.2" % "test"
)
// Remove < % "test" > to run samples-scala against the local workspace
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion % "test",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion % "test"
)
}
lazy val root = project.in(file(".")).configs(IntegrationTest)
/*
* Publishing options
/* Publishing options
* see http://www.scala-sbt.org/0.13/docs/Artifacts.html
*/
publishArtifact in Test := true
@ -53,3 +59,9 @@ bintrayOrganization := Some("microsoftazuretoketi")
bintrayRepository := "toketi-repo"
bintrayPackage := "iothub-react"
bintrayReleaseOnPublish in ThisBuild := true
// Required in Sonatype
pomExtra :=
<url>https://github.com/Azure/toketi-iothubreact</url>
<scm><url>https://github.com/Azure/toketi-iothubreact</url></scm>
<developers><developer><id>microsoft</id><name>Microsoft</name></developer></developers>

Двоичные данные
devices.json.enc Normal file

Двоичный файл не отображается.

Просмотреть файл

@ -6,7 +6,7 @@
<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react-demo</artifactId>
<version>0.6.0</version>
<version>0.7.0</version>
<repositories>
<repository>
@ -19,17 +19,12 @@
<dependency>
<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react_2.11</artifactId>
<version>0.6.0</version>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>2.4.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.9</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.3</version>
</dependency>
</dependencies>

2
samples-java/run_samples.cmd Executable file
Просмотреть файл

@ -0,0 +1,2 @@
mvn clean compile exec:java -Dexec.mainClass="DisplayMessages.Demo"

2
samples-java/run_samples.sh Executable file
Просмотреть файл

@ -0,0 +1,2 @@
mvn clean compile exec:java -Dexec.mainClass="DisplayMessages.Demo"

Просмотреть файл

@ -6,33 +6,59 @@ import akka.Done;
import akka.NotUsed;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.microsoft.azure.iot.iothubreact.javadsl.IoTHub;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.iot.iothubreact.IoTMessage;
import com.microsoft.azure.iot.iothubreact.javadsl.IoTHub;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import static java.lang.System.out;
/**
* Retrieve messages from IoT hub and display the data in the console
*/
public class Demo extends ReactiveStreamingApp {
static Source<IoTMessage, NotUsed> messagesFromAllPartitions;
static ObjectMapper jsonParser = new ObjectMapper();
public static void main(String args[]) {
// Source retrieving messages from one IoT hub partition (0 to N-1, where N is
// defined at deployment time)
//Source messagesFromOnePartition = new IoTHub().source(PARTITION);
// Source retrieving messages from one IoT hub partition (e.g. partition 2)
//Source<IoTMessage, NotUsed> messages = new IoTHubPartition(2).source();
// Source retrieving messages from all IoT hub partitions
Source messagesFromAllPartitions = new IoTHub().source();
// Source retrieving from all IoT hub partitions for the past 24 hours
Source<IoTMessage, NotUsed> messages = new IoTHub().source(Instant.now().minus(1, ChronoUnit.DAYS));
messagesFromAllPartitions
messages
.filter(m -> m.model().equals("temperature"))
.map(m -> parseTemperature(m))
.filter(x -> x != null && (x.value < 18 || x.value > 22))
.to(console())
.run(streamMaterializer);
}
public static Sink<IoTMessage, CompletionStage<Done>> console() {
return Sink.foreach(m -> System.out.println(m.deviceId() + ": " + m.contentAsString()));
public static Sink<Temperature, CompletionStage<Done>> console() {
return Sink.foreach(m -> {
if (m.value <= 18) {
out.println("Device: " + m.deviceId + ": temperature too LOW: " + m.value);
} else {
out.println("Device: " + m.deviceId + ": temperature to HIGH: " + m.value);
}
});
}
public static Temperature parseTemperature(IoTMessage m) {
try {
Map<String, Object> hash = jsonParser.readValue(m.contentAsString(), Map.class);
Temperature t = new Temperature();
t.value = Double.parseDouble(hash.get("value").toString());
t.deviceId = m.deviceId();
return t;
} catch (Exception e) {
return null;
}
}
}

Просмотреть файл

@ -0,0 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
package DisplayMessages;
public class Temperature {
String deviceId;
Double value;
}

Просмотреть файл

@ -24,7 +24,9 @@ iothub {
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
}
iothub-stream {
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
receiverTimeout = 3s
@ -32,6 +34,56 @@ iothub {
receiverBatchSize = 999
}
// IoT hub stream checkpointing options
iothub-checkpointing {
// Whether the checkpointing feature is enabled
enabled = true
// Checkpoints frequency (best effort), for each IoT hub partition
// Min: 1 second, Max: 1 minute
frequency = 10s
// How many messages to stream before saving the position, for each IoT hub partition.
// Since the stream position is saved in the Source, before the rest of the
// Graph (Flows/Sinks), this provides a mechanism to replay buffered messages.
// The value should be greater than receiverBatchSize
countThreshold = 5
// Store a position if its value is older than this amount of time, ignoring the threshold.
// For instance when the telemetry stops, this will force to write the last offset after some time.
// Min: 1 second, Max: 1 hour. Value is rounded to seconds.
timeThreshold = 30s
storage {
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
rwTimeout = 5s
backendType = "Cassandra"
// If you use the same storage while processing multiple streams, you'll want
// to use a distinct table/container/path in each job, to to keep state isolated
namespace = "iothub-react-checkpoints"
azureblob {
// Time allowed for a checkpoint to be written, rounded to seconds (min 15, max 60)
lease = 15s
// Whether to use the Azure Storage Emulator
useEmulator = false
// Storage credentials
protocol = "https"
account = ${?IOTHUB_CHECKPOINT_ACCOUNT}
key = ${?IOTHUB_CHECKPOINT_KEY}
}
// You can easily test this with Docker --> docker run -ip 9042:9042 --rm cassandra
cassandra {
cluster = "localhost:9042"
replicationFactor = 1
}
}
}
// @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
akka {
# Options: OFF, ERROR, WARNING, INFO, DEBUG

Просмотреть файл

@ -1,21 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
scalaVersion := "2.11.8"
crossScalaVersions := Seq("2.11.8", "2.12.0-RC1")
scalacOptions ++= Seq("-deprecation", "-explaintypes", "-unchecked", "-feature")
// This repository contains development snapshots. Production releases are in Maven Central.
resolvers += "Toketi Dev Snapshots" at "https://dl.bintray.com/microsoftazuretoketi/toketi-repo"
resolvers += "Dev Snapshots" at "https://dl.bintray.com/microsoftazuretoketi/toketi-repo"
libraryDependencies ++= {
val prodVersion = "0.6.0"
val devVersion = "0.6.0-dev"
val prodVersion = "0.7.0"
val devVersion = "0.7.0-DEV.161025c"
Seq(
"com.microsoft.azure.iot" %% "iothub-react" % prodVersion,
// Jackson libraries for JSON marshalling and unmarshalling
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.3",
// Jackson module for scala object marshalling and unmarshalling
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.8.2"
"com.microsoft.azure.iot" %% "iothub-react" % prodVersion
)
}

1
samples-scala/run_samples.cmd Executable file
Просмотреть файл

@ -0,0 +1 @@
sbt run

1
samples-scala/run_samples.sh Executable file
Просмотреть файл

@ -0,0 +1 @@
sbt run

Просмотреть файл

@ -24,7 +24,9 @@ iothub {
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
}
iothub-stream {
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
receiverTimeout = 3s
@ -32,6 +34,56 @@ iothub {
receiverBatchSize = 999
}
// IoT hub stream checkpointing options
iothub-checkpointing {
// Whether the checkpointing feature is enabled
enabled = true
// Checkpoints frequency (best effort), for each IoT hub partition
// Min: 1 second, Max: 1 minute
frequency = 10s
// How many messages to stream before saving the position, for each IoT hub partition.
// Since the stream position is saved in the Source, before the rest of the
// Graph (Flows/Sinks), this provides a mechanism to replay buffered messages.
// The value should be greater than receiverBatchSize
countThreshold = 5
// Store a position if its value is older than this amount of time, ignoring the threshold.
// For instance when the telemetry stops, this will force to write the last offset after some time.
// Min: 1 second, Max: 1 hour. Value is rounded to seconds.
timeThreshold = 30s
storage {
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
rwTimeout = 5s
backendType = "Cassandra"
// If you use the same storage while processing multiple streams, you'll want
// to use a distinct table/container/path in each job, to to keep state isolated
namespace = "iothub-react-checkpoints"
azureblob {
// Time allowed for a checkpoint to be written, rounded to seconds (min 15, max 60)
lease = 15s
// Whether to use the Azure Storage Emulator
useEmulator = false
// Storage credentials
protocol = "https"
account = ${?IOTHUB_CHECKPOINT_ACCOUNT}
key = ${?IOTHUB_CHECKPOINT_KEY}
}
// You can easily test this with Docker --> docker run -ip 9042:9042 --rm cassandra
cassandra {
cluster = "localhost:9042"
replicationFactor = 1
}
}
}
// @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
akka {
# Options: OFF, ERROR, WARNING, INFO, DEBUG

Просмотреть файл

@ -0,0 +1,45 @@
// Copyright (c) Microsoft. All rights reserved.
package A_OutputMessagesToConsole
import akka.stream.scaladsl.Sink
import com.microsoft.azure.iot.iothubreact.filters.Model
import com.microsoft.azure.iot.iothubreact.scaladsl.{IoTHub, IoTHubPartition}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import com.microsoft.azure.iot.iothubreact.ResumeOnError._
import scala.language.{implicitConversions, postfixOps}
/** Retrieve messages from IoT hub and display the data sent from Temperature devices
*/
object Demo extends App {
// Source retrieving messages from one IoT hub partition (e.g. partition 2)
//val messagesFromOnePartition = IoTHubPartition(2).source()
// Source retrieving only recent messages
//val messagesFromNowOn = IoTHub().source(java.time.Instant.now())
// Source retrieving messages from all IoT hub partitions
val messagesFromAllPartitions = IoTHub().source()
// Sink printing to the console
val console = Sink.foreach[Temperature] {
t println(s"Device ${t.deviceId}: temperature: ${t.value}C ; T=${t.datetime}")
}
// JSON parser setup, brings in default date formats etc.
implicit val formats = DefaultFormats
// Stream
messagesFromAllPartitions
.filter(Model("temperature"))
.map(m {
val temperature = parse(m.contentAsString).extract[Temperature]
temperature.deviceId = m.deviceId
temperature
})
.to(console)
.run()
}

Просмотреть файл

@ -1,34 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.
package OutputMessagesToConsole
package A_OutputMessagesToConsole
import java.time.{ZoneId, ZonedDateTime}
/** Temperature measure by a device
/** ISO8601 with and without milliseconds decimals
*
* @param deviceId Device ID passed by the device (ideally this matches the ID registered in IoTHub)
* @param value Temperature value measured by the device
* @param time Time (as a string) when the device measured the temperature
* @param text String date
*/
class Temperature(var deviceId: String,
val value: Float,
val time: String) {
// ISO8601 with and without milliseconds decimals
case class ISO8601DateTime(text: String) {
private val pattern1 = """(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+).(\d+)Z""".r
private val pattern2 = """(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)Z""".r
/** Parse the time sent by the device.
*
* @return the time as an object
*/
def getTime(): ZonedDateTime = {
time match {
val value: ZonedDateTime = {
text match {
case pattern1(y, m, d, h, i, s, n) ZonedDateTime.of(y.toInt, m.toInt, d.toInt, h.toInt, i.toInt, s.toInt, n.toInt * 1000000, ZoneId.of("UTC"))
case pattern2(y, m, d, h, i, s) ZonedDateTime.of(y.toInt, m.toInt, d.toInt, h.toInt, i.toInt, s.toInt, 0, ZoneId.of("UTC"))
case null null
case _ throw new Exception(s"wrong date time format: $time")
case _ throw new Exception(s"wrong date time format: $text")
}
}
override def toString: String = if (value == null) "" else value.toString
}

Просмотреть файл

@ -0,0 +1,17 @@
// Copyright (c) Microsoft. All rights reserved.
package A_OutputMessagesToConsole
import java.time.{ZoneId, ZonedDateTime}
/** Temperature measure by a device
*
* @param value Temperature value measured by the device
* @param time Time (as a string) when the device measured the temperature
*/
case class Temperature(value: Float, time: String) {
var deviceId: String = ""
val datetime = ISO8601DateTime(time)
}

Просмотреть файл

@ -1,14 +1,15 @@
// Copyright (c) Microsoft. All rights reserved.
package MessagesThroughput
package B_MessagesThroughput
import akka.stream.ThrottleMode
import akka.stream.scaladsl.{Flow, Sink}
import com.microsoft.azure.iot.iothubreact.IoTMessage
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHub
import com.microsoft.azure.iot.iothubreact.ResumeOnError._
import scala.concurrent.duration._
import scala.io.StdIn
import scala.language.postfixOps
/** Retrieve messages from IoT hub managing the stream velocity
*
@ -18,7 +19,7 @@ import scala.io.StdIn
* - How to combine multiple destinations
* - Back pressure
*/
object Demo extends App with ReactiveStreaming {
object Demo extends App {
// Maximum speed allowed
val maxSpeed = 200
@ -26,7 +27,7 @@ object Demo extends App with ReactiveStreaming {
val showStatsEvery = 1 second
print(s"Do you want to test throttling (${maxSpeed} msg/sec) ? [y/N] ")
val input = StdIn.readLine()
val input = scala.io.StdIn.readLine()
val throttling = input.size > 0 && input(0).toUpper == 'Y'
// Stream throttling sink
@ -50,11 +51,11 @@ object Demo extends App with ReactiveStreaming {
// Start processing the stream
if (throttling) {
new IoTHub().source
IoTHub().source
.to(throttleAndMonitor)
.run()
} else {
new IoTHub().source
IoTHub().source
.to(monitor)
.run()
}

Просмотреть файл

@ -1,12 +1,13 @@
// Copyright (c) Microsoft. All rights reserved.
package MessagesThroughput
package B_MessagesThroughput
import com.typesafe.config.ConfigFactory
import scala.collection.parallel.mutable.ParArray
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
import scala.language.postfixOps
/** Monitoring logic, some properties to keep count and a method to print the
* statistics.

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
package C_Checkpoints
import akka.stream.scaladsl.Sink
import com.microsoft.azure.iot.iothubreact.IoTMessage
import com.microsoft.azure.iot.iothubreact.ResumeOnError._
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHub
/** Retrieve messages from IoT hub and save the current position
* In case of restart the stream starts from where it left
* (depending on the configuration)
*
* Note, the demo requires Cassandra, you can start an instance with Docker:
* # docker run -ip 9042:9042 --rm cassandra
*/
object Demo extends App {
// Sink printing to the console
val console = Sink.foreach[IoTMessage] {
t println(s"Message from ${t.deviceId} - Time: ${t.created}")
}
// Stream
IoTHub().source(withCheckpoints = true)
.filter(m m.deviceId == "device1000")
.to(console)
.run()
}

Просмотреть файл

@ -1,21 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
package MessagesThroughput
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
/** Initialize reactive streaming
*
* @todo Don't use supervisor with Akka streams
*/
trait ReactiveStreaming {
val decider: Supervision.Decider = {
case e: Exception
println(e.getMessage)
Supervision.Resume
}
implicit val actorSystem = ActorSystem("Demo")
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
}

Просмотреть файл

@ -1,44 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
package OutputMessagesToConsole
import akka.stream.scaladsl.Sink
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHub
/** Retrieve messages from IoT hub and display the data sent from Temperature
* devices
*/
object Demo extends App with ReactiveStreaming {
// Source retrieving messages from all IoT hub partitions
val messagesFromAllPartitions = new IoTHub().source()
// Source retrieving only recent messages
// import java.time.Instant
// val messagesFromNowOn = new IoTHub().source(Instant.now())
// Source retrieving messages from one IoT hub partition (0 to N-1, where N is
// defined at deployment time)
// val messagesFromOnePartition = new IoTHub().source(PARTITION)
// Sink printing to the console
val console = Sink.foreach[Temperature] {
t println(s"Device ${t.deviceId}: temperature: ${t.value}F ; T=${t.time}")
}
// JSON parser setup
val jsonParser = new ObjectMapper()
jsonParser.registerModule(DefaultScalaModule)
// Start processing the stream
messagesFromAllPartitions
.map(m {
val temperature = jsonParser.readValue(m.contentAsString, classOf[Temperature])
temperature.deviceId = m.deviceId
temperature
})
.to(console)
.run()
}

Просмотреть файл

@ -1,21 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
package OutputMessagesToConsole
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
/** Initialize reactive streaming
*
* @todo Don't use supervisor with Akka streams
*/
trait ReactiveStreaming {
val decider: Supervision.Decider = {
case e: Exception
println(e.getMessage)
Supervision.Resume
}
implicit val actorSystem = ActorSystem("Demo")
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
}

Просмотреть файл

@ -24,7 +24,9 @@ iothub {
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
}
iothub-stream {
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
receiverTimeout = 3s
@ -32,8 +34,52 @@ iothub {
receiverBatchSize = 999
}
// @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
akka {
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "ERROR"
// IoT hub stream checkpointing options
iothub-checkpointing {
// Whether the checkpointing feature is enabled
enabled = false
// Checkpoints frequency (best effort), for each IoT hub partition
// Min: 1 second, Max: 1 minute
frequency = 15s
// How many messages to stream before saving the position, for each IoT hub partition.
// Since the stream position is saved in the Source, before the rest of the
// Graph (Flows/Sinks), this provides a mechanism to replay buffered messages.
// The value should be greater than receiverBatchSize
countThreshold = 2000
// Store a position if its value is older than this amount of time, ignoring the threshold.
// For instance when the telemetry stops, this will force to write the last offset after some time.
// Min: 1 second, Max: 1 hour. Value is rounded to seconds.
timeThreshold = 5min
storage {
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
rwTimeout = 5s
backendType = "AzureBlob"
// If you use the same storage while processing multiple streams, you'll want
// to use a distinct table/container/path in each job, to to keep state isolated
namespace = "iothub-react-checkpoints"
// com.microsoft.azure.iot.iothubreact.checkpointing.Backends.AzureBlob
azureblob {
// Time allowed for a checkpoint to be written, rounded to seconds (min 15, max 60)
lease = 15s
// Whether to use the Azure Storage Emulator
useEmulator = false
// Storage credentials
protocol = "https"
account = ${?IOTHUB_CHECKPOINT_ACCOUNT}
key = ${?IOTHUB_CHECKPOINT_KEY}
}
cassandra {
cluster = "localhost:9042"
replicationFactor = 1
}
}
}

Просмотреть файл

@ -2,22 +2,27 @@
package com.microsoft.azure.iot.iothubreact
import java.util.concurrent.TimeUnit
import com.microsoft.azure.eventhubs.EventHubClient
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.duration._
import scala.language.postfixOps
/** Hold IoT Hub configuration settings
*
* @see https://github.com/typesafehub/config for information about the
* configuration file formats
* @todo dependency injection
*/
private object Configuration {
private[iothubreact] object Configuration {
// Maximum size supported by the client
private[this] val MaxBatchSize = 999
// Maximum size supported by the client
private[this] val DefaultReceiverTimeout = 3000
// Default IoThub client timeout
private[this] val DefaultReceiverTimeout = 3 seconds
private[this] val conf: Config = ConfigFactory.load()
@ -41,15 +46,15 @@ private object Configuration {
}
// Message retrieval timeout in milliseconds
private[this] val tmpRTO = conf.getDuration("iothub.receiverTimeout").toMillis
val receiverTimeout: Long =
private[this] val tmpRTO = conf.getDuration("iothub-stream.receiverTimeout").toMillis
val receiverTimeout: FiniteDuration =
if (tmpRTO > 0)
tmpRTO
FiniteDuration(tmpRTO, TimeUnit.MILLISECONDS)
else
DefaultReceiverTimeout
// How many messages to retrieve on each call to the storage
private[this] val tmpRBS = conf.getInt("iothub.receiverBatchSize")
private[this] val tmpRBS = conf.getInt("iothub-stream.receiverBatchSize")
val receiverBatchSize: Int =
if (tmpRBS > 0 && tmpRBS <= MaxBatchSize)
tmpRBS

Просмотреть файл

@ -5,7 +5,7 @@ package com.microsoft.azure.iot.iothubreact
import com.microsoft.azure.eventhubs.EventHubClient
import com.microsoft.azure.servicebus.ConnectionStringBuilder
private object IoTHubStorage {
private object IoTHubStorage extends Logger {
private[this] val connString = new ConnectionStringBuilder(
Configuration.iotHubNamespace,
@ -16,5 +16,8 @@ private object IoTHubStorage {
// @todo Manage transient errors e.g. timeouts
// EventHubClient.createFromConnectionString(connString)
// .get(Configuration.receiverTimeout, TimeUnit.MILLISECONDS)
def createClient(): EventHubClient = EventHubClient.createFromConnectionStringSync(connString)
def createClient(): EventHubClient = {
log.debug(s"Creating EventHub client to ${Configuration.iotHubName}")
EventHubClient.createFromConnectionStringSync(connString)
}
}

Просмотреть файл

@ -3,6 +3,7 @@
package com.microsoft.azure.iot.iothubreact
import java.time.Instant
import java.util
import com.microsoft.azure.eventhubs.EventData
@ -18,7 +19,7 @@ private object IoTMessage {
* @return
*/
def apply(rawData: EventData, partition: Option[Int]): IoTMessage = {
new IoTMessage(rawData, partition)
new IoTMessage(Some(rawData), partition)
}
}
@ -26,10 +27,21 @@ private object IoTMessage {
*
* @param partition Storage partition where the message was retrieved from
*/
class IoTMessage(data: EventData, val partition: Option[Int]) {
class IoTMessage(data: Option[EventData], val partition: Option[Int]) {
// Internal properties set by IoT stoage
private[this] lazy val systemProps = data.getSystemProperties()
private[this] lazy val systemProps = data.get.getSystemProperties()
// Meta properties set by the device
// Note: empty when using MQTT
lazy val properties: util.Map[String, String] = data.get.getProperties()
// Whether this is a keep alive message generated by the stream and not by IoT hub
val isKeepAlive: Boolean = (partition == None)
// Content type, e.g. how to interpret/deserialize the content
// Note: empty when using MQTT
lazy val model: String = properties.getOrDefault("model", "")
/** Time when the message was received by IoT hub service
* Note that this might differ from the time set by the device, e.g. in case
@ -49,7 +61,7 @@ class IoTMessage(data: EventData, val partition: Option[Int]) {
lazy val deviceId: String = systemProps.get("iothub-connection-device-id").toString
// IoT message content bytes
lazy val content: Array[Byte] = data.getBody
lazy val content: Array[Byte] = data.get.getBody
// IoT message content as string, e.g. JSON/XML/etc.
lazy val contentAsString: String = new String(content)

Просмотреть файл

@ -8,9 +8,11 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import com.microsoft.azure.eventhubs.{EventData, PartitionReceiver}
import com.microsoft.azure.eventhubs.PartitionReceiver
import scala.collection.convert.decorateAsScala._
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
private object IoTMessageSource {
@ -22,8 +24,8 @@ private object IoTMessageSource {
* @return A source returning the body of the message sent from a device.
* Deserialization is left to the consumer.
*/
def apply(partition: Int, offset: String): Source[IoTMessage, NotUsed] = {
Source.fromGraph(new IoTMessageSource(partition, offset))
def apply(partition: Int, offset: String, withCheckpoints: Boolean): Source[IoTMessage, NotUsed] = {
Source.fromGraph(new IoTMessageSource(partition, offset, withCheckpoints))
}
/** Create an instance of the messages source for the specified partition
@ -34,42 +36,70 @@ private object IoTMessageSource {
* @return A source returning the body of the message sent from a device.
* Deserialization is left to the consumer.
*/
def apply(partition: Int, startTime: Instant): Source[IoTMessage, NotUsed] = {
Source.fromGraph(new IoTMessageSource(partition, startTime))
def apply(partition: Int, startTime: Instant, withCheckpoints: Boolean): Source[IoTMessage, NotUsed] = {
Source.fromGraph(new IoTMessageSource(partition, startTime, withCheckpoints))
}
}
/** Source of messages from one partition of the IoT hub storage
*
* @param partition Partition number (0 to N-1) to read from
* @param offset Starting position
*
* @todo Refactor and use async methods, compare performance
* @todo Consider option to deserialize on the fly to [T], assuming JSON format
*/
private class IoTMessageSource(val partition: Int, val offset: String)
extends GraphStage[SourceShape[IoTMessage]]
with Logger {
private class IoTMessageSource() extends GraphStage[SourceShape[IoTMessage]] with Logger {
abstract class OffsetType
case class SequenceOffset() extends OffsetType
case object UnspecifiedOffset extends OffsetType
case class TimeOffset() extends OffsetType
case object SequenceOffset extends OffsetType
def this(partition: Int, startTime: Instant) {
this(partition, "*not used*")
offsetType = TimeOffset()
_startTime = startTime
}
case object TimeOffset extends OffsetType
// When retrieving messages, include the message with the initial offset
private[this] val OFFSET_INCLUSIVE = true
private[this] val OffsetInclusive = true
// Time of offset used when defining the start of the stream
private[this] var offsetType: OffsetType = SequenceOffset()
private[this] var _partition : Option[Int] = None
private[this] var _offset : Option[String] = None
private[this] var _withCheckpoints: Option[Boolean] = None
private[this] var _startTime : Option[Instant] = None
private[this] var offsetType : OffsetType = UnspecifiedOffset
private[this] var _startTime: Instant = Instant.MIN
private[this] def partition: Int = _partition.get
private[this] def offset: String = _offset.get
private[this] def withCheckpoints: Boolean = _withCheckpoints.get
private[this] def startTime: Instant = _startTime.get
/** Source of messages from one partition of the IoT hub storage
*
* @param partition Partition number (0 to N-1) to read from
* @param offset Starting position
* @param withCheckpoints Whether to read/write current position
*/
def this(partition: Int, offset: String, withCheckpoints: Boolean) {
this()
_partition = Some(partition)
_offset = Some(offset)
_withCheckpoints = Some(withCheckpoints)
offsetType = SequenceOffset
}
/** Source of messages from one partition of the IoT hub storage
*
* @param partition Partition number (0 to N-1) to read from
* @param startTime Starting position
* @param withCheckpoints Whether to read/write current position
*/
def this(partition: Int, startTime: Instant, withCheckpoints: Boolean) {
this()
_partition = Some(partition)
_startTime = Some(startTime)
_withCheckpoints = Some(withCheckpoints)
offsetType = TimeOffset
}
// Define the (sole) output port of this stage
private[this] val out: Outlet[IoTMessage] = Outlet("IoTMessageSource")
@ -80,55 +110,66 @@ private class IoTMessageSource(val partition: Int, val offset: String)
// All state MUST be inside the GraphStageLogic, never inside the enclosing
// GraphStage. This state is safe to read/write from all the callbacks
// provided by GraphStageLogic and the registered handlers.
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def createLogic(attr: Attributes): GraphStageLogic = {
log.debug(s"Creating the IoT hub source")
new GraphStageLogic(shape) {
// Connect to the IoT hub storage
lazy val receiver: PartitionReceiver = offsetType match {
case SequenceOffset() {
log.info(s"Connecting to partition ${partition.toString} starting from ${offset}")
IoTHubStorage
.createClient()
.createReceiverSync(
Configuration.receiverConsumerGroup,
partition.toString,
offset,
OFFSET_INCLUSIVE)
}
case TimeOffset() {
log.info(s"Connecting to partition ${partition.toString} starting from ${_startTime}")
IoTHubStorage
.createClient()
.createReceiverSync(
Configuration.receiverConsumerGroup,
partition.toString,
_startTime)
}
}
val keepAliveSignal = new IoTMessage(None, None)
val emptyResult = List[IoTMessage](keepAliveSignal)
val emptyResult = new Array[IoTMessage](0).toList
lazy val receiver = getIoTHubReceiver
// @todo Consider pausing on empty partitions
setHandler(out, new OutHandler {
override def onPull(): Unit = {
try {
setHandler(out, new OutHandler {
log.debug(s"Defining the output handler")
val messages: java.lang.Iterable[EventData] =
receiver.receiveSync(Configuration.receiverBatchSize)
override def onPull(): Unit = {
try {
val messages = Retry(2, 1 seconds) {
receiver.receiveSync(Configuration.receiverBatchSize)
}
if (messages == null) {
log.debug(s"Partition ${partition} is empty")
emitMultiple(out, emptyResult)
} else {
val iterator: scala.collection.immutable.Iterable[IoTMessage] =
messages.asScala.map(e IoTMessage(e, Some(partition))).toList
emitMultiple(out, iterator)
if (messages == null) {
log.debug(s"Partition ${partition} is empty")
emitMultiple(out, emptyResult)
} else {
val iterator = messages.asScala.map(e IoTMessage(e, Some(partition))).toList
emitMultiple(out, iterator)
}
} catch {
case e: Exception {
log.error(e, "Fatal error: " + e.getMessage)
}
}
} catch {
case e: Exception {
log.error(e, "Fatal error: " + e.getMessage)
}
})
/** Connect to the IoT hub storage
*
* @return IoT hub storage receiver
*/
def getIoTHubReceiver: PartitionReceiver = Retry(3, 2 seconds) {
offsetType match {
case SequenceOffset {
log.info(s"Connecting to partition ${partition.toString} starting from offset '${offset}'")
IoTHubStorage
.createClient()
.createReceiverSync(
Configuration.receiverConsumerGroup,
partition.toString,
offset,
OffsetInclusive)
}
case TimeOffset {
log.info(s"Connecting to partition ${partition.toString} starting from time '${startTime}'")
IoTHubStorage
.createClient()
.createReceiverSync(
Configuration.receiverConsumerGroup,
partition.toString,
startTime)
}
}
}
})
}
}
}

Просмотреть файл

@ -9,7 +9,7 @@ import akka.event.{LogSource, Logging}
*
* @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
*/
private trait Logger {
private[iothubreact] trait Logger {
implicit val logSource = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName

Просмотреть файл

@ -0,0 +1,15 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
object Offset {
def apply(value: String) = new Offset(value)
}
/** Class used to pass the starting point to IoTHub storage
*
* @param value The offset value
*/
class Offset(val value: String) {
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
/** Akka streaming settings to resume the stream in case of errors
*
* @todo Review the usage of a supervisor with Akka streams
*/
case object ResumeOnError extends Logger {
private[this] val decider: Supervision.Decider = {
case e: Exception {
log.error(e, e.getMessage)
Supervision.Resume
}
}
implicit val actorSystem = ActorSystem("ResumeOnErrorStream")
private[this] val settings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(settings)
}

Просмотреть файл

@ -0,0 +1,42 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
import scala.concurrent.duration.Duration
/** Retry logic
*/
private[iothubreact] object Retry extends Logger {
/** Retry to execute some code, pausing between the attempts
*
* @param times Number of attempts (>=1)
* @param pause Pause between attempts
* @param code Code to execute
* @tparam A Type of the result returned by `code`
*
* @return Result provided by `code`
*/
def apply[A](times: Int, pause: Duration)(code: A): A = {
var result: Option[A] = None
var remaining = times
while (remaining > 0) {
remaining -= 1
try {
result = Some(code)
remaining = 0
} catch {
case e: Exception {
if (remaining > 0) {
log.warning(s"Retry loop: ${remaining} attempts left [${e.getMessage}]")
Thread.sleep(pause.toMillis)
} else {
throw e
}
}
}
}
result.get
}
}

Просмотреть файл

@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
/** Akka streaming settings to stop the stream in case of errors
*
* @todo Review the usage of a supervisor with Akka streams
*/
case object StopOnError extends Logger {
private[this] val decider: Supervision.Decider = {
case e: Exception {
log.error(e, e.getMessage)
Supervision.Stop
}
}
implicit val actorSystem = ActorSystem("StopOnErrorStream")
private[this] val settings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(settings)
}

Просмотреть файл

@ -0,0 +1,178 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends
import java.io.IOException
import java.net.URISyntaxException
import java.util.UUID
import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition
import com.microsoft.azure.iot.iothubreact.{Logger, Retry}
import com.microsoft.azure.storage.blob.CloudBlockBlob
import com.microsoft.azure.storage.{AccessCondition, CloudStorageAccount, OperationContext, StorageException}
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
/** Storage logic to write checkpoints to Azure blobs
*/
private[iothubreact] class AzureBlob extends CheckpointBackend with Logger {
// Set the account to point either to Azure or the emulator
val account: CloudStorageAccount = if (Configuration.azureBlobEmulator)
CloudStorageAccount.getDevelopmentStorageAccount()
else
CloudStorageAccount.parse(Configuration.azureBlobConnectionString)
val client = account.createCloudBlobClient()
// Set the container, ensure it's ready
val container = client.getContainerReference(checkpointNamespace)
try {
Retry(2, 5 seconds) {
container.createIfNotExists()
}
} catch {
case e: StorageException {
log.error(e, s"Err: ${e.getMessage}; Code: ${e.getErrorCode}; Status: ${e.getHttpStatusCode}")
throw e
}
case e: IOException {
log.error(e, e.getMessage)
throw e
}
case e: Exception {
log.error(e, e.getMessage)
throw e
}
}
/** Read the offset of the last record processed for the given partition
*
* @param partition Partition number
*
* @return Offset of the last record (already) processed
*/
override def readOffset(partition: Int): String = {
val file = getBlockBlobReference(partition)
try {
file.downloadText()
} catch {
case e: StorageException {
if (e.getErrorCode == "BlobNotFound") {
IoTHubPartition.OffsetCheckpointNotFound
} else {
log.error(e, s"Err: ${e.getMessage}; Code: ${e.getErrorCode}; Status: ${e.getHttpStatusCode}")
throw e
}
}
case e: IOException {
log.error(e, e.getMessage)
throw e
}
case e: Exception {
log.error(e, e.getMessage)
throw e
}
}
}
/** Store the offset for the given IoT hub partition
*
* @param partition IoT hub partition number
* @param offset IoT hub partition offset
*/
override def writeOffset(partition: Int, offset: String): Unit = {
val file = getBlockBlobReference(partition)
val leaseId = acquireLease(file)
writeAndRelease(file, leaseId, offset)
}
private[this] def getBlockBlobReference(partition: Int): CloudBlockBlob = {
try {
Retry(2, 2 seconds) {
container.getBlockBlobReference(filename(partition))
}
} catch {
case e: StorageException {
log.error(e, e.getMessage)
throw e
}
case e: URISyntaxException {
log.error(e, e.getMessage)
throw e
}
case e: Exception {
log.error(e, e.getMessage)
throw e
}
}
}
private[this] def acquireLease(file: CloudBlockBlob): String = {
// Note: the lease ID must be a Guid otherwise the service returs 400
var leaseId = UUID.randomUUID().toString
try {
file.acquireLease(Configuration.azureBlobLeaseDuration.toSeconds.toInt, leaseId)
} catch {
case e: StorageException {
if (e.getErrorCode == "BlobNotFound") {
leaseId = ""
} else {
log.error(e, s"Err: ${e.getMessage}; Code: ${e.getErrorCode}; Status: ${e.getHttpStatusCode}")
throw e
}
}
case e: Exception {
log.error(e, e.getMessage)
throw e
}
}
leaseId
}
private[this] def writeAndRelease(file: CloudBlockBlob, leaseId: String, content: String): Unit = {
// The access condition depends on the file existing
val accessCondition = if (leaseId == "")
AccessCondition.generateEmptyCondition()
else
AccessCondition.generateLeaseCondition(leaseId)
try {
file.uploadText(content, "UTF-8", accessCondition, null, new OperationContext)
// If this is a new file, there is no lease to release
if (leaseId != "") file.releaseLease(accessCondition)
} catch {
case e: StorageException {
log.error(e, e.getMessage)
throw e
}
case e: IOException {
log.error(e, e.getMessage)
throw e
}
case e: Exception {
log.error(e, e.getMessage)
throw e
}
}
}
private[this] def filename(partition: Int): String = "partition-" + partition
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends
import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
private[iothubreact] trait CheckpointBackend {
def checkpointNamespace: String = Configuration.storageNamespace
/** Read the offset of the last record processed for the given partition
*
* @param partition IoT hub partition number
*
* @return Offset of the last record (already) processed
*/
def readOffset(partition: Int): String
/** Store the offset for the given IoT hub partition
*
* @param partition IoT hub partition number
* @param offset IoT hub partition offset
*/
def writeOffset(partition: Int, offset: String): Unit
}

Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import scala.language.{implicitConversions, postfixOps}
/** The actors infrastructure for storing the stream position
*/
private[iothubreact] object CheckpointActorSystem {
implicit private[this] val actorSystem = ActorSystem("IoTHubReact")
implicit private[this] val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem))
var localRegistry: Map[String, ActorRef] = Map[String, ActorRef]()
/** Create an actor to read/write offset checkpoints from the storage
*
* @param partition IoT hub partition number
*
* @return Actor reference
*/
def getCheckpointService(partition: Int): ActorRef = {
val actorPath = "CheckpointService" + partition
localRegistry get actorPath match {
case Some(actorRef) => actorRef
case None => {
val actorRef = actorSystem.actorOf(Props(new CheckpointService(partition)), actorPath)
localRegistry += Tuple2(actorPath, actorRef)
actorRef
}
}
}
}

Просмотреть файл

@ -0,0 +1,181 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing
import java.time.Instant
import java.util.concurrent.Executors
import akka.actor.{Actor, Stash}
import com.microsoft.azure.iot.iothubreact.Logger
import com.microsoft.azure.iot.iothubreact.checkpointing.CheckpointService.{GetOffset, StoreOffset, UpdateOffset}
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.{AzureBlob, CassandraTable, CheckpointBackend}
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition
import scala.concurrent.ExecutionContext
private[iothubreact] object CheckpointService {
// Command used to read the current partition position
case object GetOffset
// Command used to update the position stored in memory
case class UpdateOffset(value: String)
// Command use to write the position from memory to storage
case object StoreOffset
}
/** Checkpointing agent. Takes care of initializing the right storage, reading and writing to it.
* Each agent instance work on a single IoT hub partition
*
* @param partition IoT hub partition number [0..N]
*/
private[iothubreact] class CheckpointService(partition: Int)
extends Actor
with Stash
with Logger {
type OffsetsData = Tuple3[String, Long, Long]
implicit val executionContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(sys.runtime.availableProcessors))
// Contains the offsets up to one hour ago, max 1 offset per second (max size = 3600)
private[this] val queue = new scala.collection.mutable.Queue[OffsetsData]
// Count the offsets tracked in the queue (!= queue.size)
private[this] var queuedOffsets: Long = 0
private[this] var currentOffset: String = IoTHubPartition.OffsetStartOfStream
private[this] val storage = getCheckpointBackend
// Before the actor starts we schedule a recurring storage write
override def preStart(): Unit = {
val time = Configuration.checkpointFrequency
context.system.scheduler.schedule(time, time, self, StoreOffset)
log.info(s"Scheduled checkpoint for partition ${partition} every ${time.toMillis} ms")
}
override def receive: Receive = notReady
// At the beginning the actor can only read, stashing other commands for later
def notReady: Receive = {
case _ {
try {
context.become(busyReading)
stash()
log.debug(s"Retrieving partition ${partition} offset from the storage")
val offset = storage.readOffset(partition)
if (offset != IoTHubPartition.OffsetCheckpointNotFound) {
currentOffset = offset
}
log.debug(s"Offset retrieved for partition ${partition}: ${currentOffset}")
context.become(ready)
queuedOffsets = 0
}
catch {
case e: Exception => {
log.error(e, e.getMessage)
context.become(notReady)
}
}
finally {
unstashAll()
}
}
}
// While reading the offset, we stash all commands, to avoid concurrent GetOffset commands
def busyReading: Receive = {
case _ stash()
}
// After loading the offset from the storage, the actor is ready process all commands
def ready: Receive = {
case GetOffset sender() ! currentOffset
case UpdateOffset(value: String) updateOffsetAction(value)
case StoreOffset {
try {
if (queue.size > 0) {
context.become(busyWriting)
var offsetToStore: String = ""
val now = Instant.now.getEpochSecond
val timeThreshold = Configuration.checkpointTimeThreshold.toSeconds
val countThreshold = Configuration.checkpointCountThreshold
// Check if the queue contains old offsets to flush (time threshold)
// Check if the queue contains data of too many messages (count threshold)
while (queue.size > 0 && ((queuedOffsets >= countThreshold) || ((now - timeOf(queue.head)) >= timeThreshold))) {
val data = queue.dequeue()
offsetToStore = offsetOf(data)
queuedOffsets -= countOf(data)
if (queue.size == 0) queuedOffsets = 0
}
if (offsetToStore == "") {
log.debug(s"Checkpoint skipped: partition=${partition}, count ${queuedOffsets} < threshold ${Configuration.checkpointCountThreshold}")
} else {
log.info(s"Writing checkpoint: partition=${partition}, storing ${offsetToStore} (current offset=${currentOffset})")
storage.writeOffset(partition, offsetToStore)
}
} else {
log.debug(s"Partition=${partition}, checkpoint queue is empty [count ${queuedOffsets}, current offset=${currentOffset}]")
}
} catch {
case e: Exception => log.error(e, e.getMessage)
} finally {
context.become(ready)
}
}
}
// While writing we discard StoreOffset signals
def busyWriting: Receive = {
case GetOffset sender() ! currentOffset
case UpdateOffset(value: String) updateOffsetAction(value)
case StoreOffset {}
}
def updateOffsetAction(offset: String) = {
if (offset.toLong > currentOffset.toLong) {
val epoch = Instant.now.getEpochSecond
// Reminder:
// queue.enqueue -> queue.last == queue(queue.size -1)
// queue.dequeue -> queue.head == queue(0)
// If the tail of the queue contains an offset stored in the current second, then increment
// the count of messages for that second. Otherwise enqueue a new element.
if (queue.size > 0 && epoch == timeOf(queue.last))
queue.update(queue.size - 1, Tuple3(offset, epoch, countOf(queue.last) + 1))
else
queue.enqueue(Tuple3(offset, epoch, 1))
queuedOffsets += 1
currentOffset = offset
}
}
// @todo Support plugins
def getCheckpointBackend: CheckpointBackend = {
val conf = Configuration.checkpointBackendType
conf.toUpperCase match {
case "AZUREBLOB" new AzureBlob
case "CASSANDRA" new CassandraTable
case _ throw new UnsupportedOperationException(s"Unknown storage type ${conf}")
}
}
def offsetOf(x: OffsetsData): String = x._1
def timeOf(x: OffsetsData): Long = x._2
def countOf(x: OffsetsData): Long = x._3
}

Просмотреть файл

@ -0,0 +1,156 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing
import java.util.concurrent.TimeUnit
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.duration._
import scala.language.postfixOps
/** Hold IoT Hub stream checkpointing configuration settings
*
* @todo Allow to use multiple configurations, for instance while processing multiple
* streams a client will need a dedicated checkpoint container for each stream
*/
private[iothubreact] object Configuration {
private[this] val confPath = "iothub-checkpointing."
private[this] val conf: Config = ConfigFactory.load()
// Default time between checkpoint writes to the storage
private[this] val DefaultFrequency = 1 second
// Minimum time between checkpoints
private[this] val MinFrequency = 1 second
// Maximum time between checkpoints
private[this] val MaxFrequency = 60 seconds
// Default timeout for checkpoint operations
private[this] val DefaultStorageRWTimeout = 10 seconds
// Minimuim timeout for checkpoint operations
private[this] val MinStorageRWTimeout = 1 seconds
// Maximum timeout for checkpoint operations
private[this] val MaxStorageRWTimeout = 60 seconds
// Default duration of the lock on the checkpoint resources
private[this] val DefaultLease = 10 seconds
// Minimuim duration of the lock on the checkpoint resources
private[this] val MinLease = 15 seconds
// Maximum duration of the lock on the checkpoint resources
private[this] val MaxLease = 60 seconds
// Default time waited before flushing an offset to storage
private[this] val DefaultTimeThreshold = 5 minutes
// Minimuim time waited before flushing an offset to storage
private[this] val MinTimeThreshold = 1 second
// Maximum time waited before flushing an offset to storage
private[this] val MaxTimeThreshold = 1 hour
// Default name of the container used to store checkpoint data
private[this] val DefaultContainer = "iothub-react-checkpoints"
// Whether checkpointing is enabled or not
lazy val isEnabled: Boolean = conf.getBoolean(confPath + "enabled")
// How often checkpoint data is written to the storage
lazy val checkpointFrequency: FiniteDuration = getDuration(
confPath + "frequency",
DefaultFrequency,
MinFrequency,
MaxFrequency)
// How many messages to replay after a restart, for each IoT hub partition
lazy val checkpointCountThreshold = Math.max(1, conf.getInt(confPath + "countThreshold"))
// Store a position if its value is older than this amount of time, rounded to seconds
// Min: 1 second, Max: 1 hour
lazy val checkpointTimeThreshold = getDuration(
confPath + "timeThreshold",
DefaultTimeThreshold,
MinTimeThreshold,
MaxTimeThreshold)
// Checkpointing operations timeout
lazy val checkpointRWTimeout: FiniteDuration = getDuration(
confPath + "storage.rwTimeout",
DefaultStorageRWTimeout,
MinStorageRWTimeout,
MaxStorageRWTimeout)
// The backend logic used to write, a.k.a. the storage type
lazy val checkpointBackendType: String = conf.getString(confPath + "storage.backendType")
// Data container
lazy val storageNamespace: String = getStorageContainer
// Whether to use the Azure Storage Emulator when using Azure blob backend
lazy val azureBlobEmulator: Boolean = conf.getBoolean(confPath + "storage.azureblob.useEmulator")
// Azure blob connection string
lazy val azureBlobConnectionString: String = getAzureBlobConnectionString
// Azure blob lease duration (15s and 60s by Azure docs)
lazy val azureBlobLeaseDuration: FiniteDuration = getDuration(
confPath + "storage.azureblob.lease",
15 seconds,
15 seconds,
60 seconds)
// Cassandra cluster address
lazy val cassandraCluster : String = conf.getString(confPath + "storage.cassandra.cluster")
lazy val cassandraReplicationFactor: Int = conf.getInt(confPath + "storage.cassandra.replicationFactor")
/** Load Azure blob connection string, taking care of the Azure storage emulator case
*
* @return Connection string
*/
private[this] def getAzureBlobConnectionString: String = {
if (conf.getBoolean(confPath + "storage.azureblob.useEmulator"))
""
else {
val protocol = conf.getString(confPath + "storage.azureblob.protocol")
val account = conf.getString(confPath + "storage.azureblob.account")
val key = conf.getString(confPath + "storage.azureblob.key")
s"DefaultEndpointsProtocol=${protocol};AccountName=${account};AccountKey=${key}";
}
}
/** Get the name of the table/container/path etc where data is stored
*
* @return Table/Container/Path name
*/
private[this] def getStorageContainer: String = {
val container = conf.getString(confPath + "storage.namespace")
if (container != "")
container
else
DefaultContainer
}
/** Get the duration of the Azure blob leases
*
* @return Lease duration in seconds
*/
private[this] def getDuration(
path: String,
default: FiniteDuration,
min: FiniteDuration,
max: FiniteDuration): FiniteDuration = {
val value = conf.getDuration(path, TimeUnit.MILLISECONDS)
if (value >= min.toMillis && value <= max.toMillis)
FiniteDuration(value, TimeUnit.MILLISECONDS)
else
default
}
}

Просмотреть файл

@ -0,0 +1,50 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.microsoft.azure.iot.iothubreact.IoTMessage
import com.microsoft.azure.iot.iothubreact.checkpointing.CheckpointService.UpdateOffset
/** Flow receiving and emitting IoT messages, while keeping note of the last offset seen
*
* @param partition IoT hub partition number
*/
private[iothubreact] class SavePositionOnPull(partition: Int)
extends GraphStage[FlowShape[IoTMessage, IoTMessage]] {
val in = Inlet[IoTMessage]("Checkpoint.Flow.in")
val out = Outlet[IoTMessage]("Checkpoint.Flow.out")
val none = ""
override val shape = FlowShape.of(in, out)
// All state MUST be inside the GraphStageLogic, never inside the enclosing
// GraphStage. This state is safe to read/write from all the callbacks
// provided by GraphStageLogic and the registered handlers.
override def createLogic(attr: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
val checkpointService = CheckpointActorSystem.getCheckpointService(partition)
var lastOffsetSent = none
// when a message enters the stage we safe its offset
setHandler(in, new InHandler {
override def onPush(): Unit = {
val message: IoTMessage = grab(in)
if (!message.isKeepAlive) lastOffsetSent = message.offset
push(out, message)
}
})
// when asked for more data we consider the saved offset processed and save it
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (lastOffsetSent != none) checkpointService ! UpdateOffset(lastOffsetSent)
pull(in)
}
})
}
}
}

Просмотреть файл

@ -0,0 +1,47 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends
import com.microsoft.azure.iot.iothubreact.Logger
import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Connection
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.{CheckpointRecord, CheckpointsTableSchema}
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition
import org.json4s.JsonAST
/** Storage logic to write checkpoints to a Cassandra table
*/
private[iothubreact] class CassandraTable extends CheckpointBackend with Logger {
val schema = new CheckpointsTableSchema(checkpointNamespace, "checkpoints")
val connection = Connection(Configuration.cassandraCluster, Configuration.cassandraReplicationFactor, schema)
val table = connection.getTable[CheckpointRecord]()
connection.createKeyspaceIfNotExists()
connection.createTableIfNotExists()
/** Read the offset of the last record processed for the given partition
*
* @param partition IoT hub partition number
*
* @return Offset of the last record (already) processed
*/
override def readOffset(partition: Int): String = {
val result: JsonAST.JObject = table.select(s"partition = ${partition}")
if (result.values("partition").asInstanceOf[BigInt] < 0) {
IoTHubPartition.OffsetCheckpointNotFound
} else {
result.values("offset").asInstanceOf[String]
}
}
/** Store the offset for the given IoT hub partition
*
* @param partition IoT hub partition number
* @param offset IoT hub partition offset
*/
override def writeOffset(partition: Int, offset: String): Unit = {
table.updateRow(CheckpointRecord(partition, offset))
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.ToCassandra
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
private[iothubreact] case class CheckpointRecord(partition: Int, offset: String)
extends ToCassandra {
/** Convert record to JSON
*
* @return JSON string
*/
override def toJsonValues: String = {
val now = java.time.Instant.now.toString
val json = ("partition" -> partition) ~ ("offset" -> offset) ~ ("lastUpdate" -> now)
compact(render(json))
}
}

Просмотреть файл

@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib._
/** Schema of the table containing the checkpoints
*/
private[iothubreact] class CheckpointsTableSchema(keySpace: String, tableName: String) extends TableSchema {
// Container name
override val keyspace: String = keySpace
// Table name
override val name: String = tableName
// Columns
override val columns: Seq[Column] = Seq(
Column("partition", ColumnType.Int, true),
Column("offset", ColumnType.String),
Column("lastUpdate", ColumnType.Timestamp)
)
}

Просмотреть файл

@ -0,0 +1,50 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.ColumnType.ColumnType
/** Column types supported
*/
private[iothubreact] object ColumnType extends Enumeration {
type ColumnType = Value
val String, Timestamp, Double, Int = Value
/** Map type to string used in CQL
*
* @param columnType Column type
*
* @return Type as string
*/
def toString(columnType: ColumnType): String = columnType match {
case ColumnType.String => "text"
case ColumnType.Timestamp => "timestamp"
case ColumnType.Double => "double"
case ColumnType.Int => "int"
case _ throw new RuntimeException(s"Missing mapping for Cassandra type ${columnType}")
}
/** Parse name to enum
*
* @param typeAsString Type as string
*
* @return Column type
*/
def fromName(typeAsString: String): ColumnType = typeAsString match {
case "text" => ColumnType.String
case "timestamp" => ColumnType.Timestamp
case "double" => ColumnType.Double
case "int" => ColumnType.Int
case _ throw new IllegalArgumentException(s"Unknown Cassandra column type '${typeAsString}'")
}
}
/** Create a column instance
*
* @param name Name of the column
* @param `type` Type of the column
* @param index Whether to the column value is part of the primary key
*/
private[iothubreact] case class Column(val name: String, val `type`: ColumnType, val index: Boolean = false)

Просмотреть файл

@ -0,0 +1,73 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib
import com.datastax.driver.core.Cluster
/** Cassandra connection
*
* @param contactPoint Hostname (and port, e.g. 1.2.3.4:9042)
* @param replicationFactor Table replication factor
* @param table Table schema
*/
private[iothubreact] case class Connection(
contactPoint: String,
replicationFactor: Int,
table: TableSchema) {
private lazy val hostPort = extractHostPort()
private lazy val cluster = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2).build()
implicit lazy val session = cluster.connect()
/** Create the key space if not present
*/
def createKeyspaceIfNotExists(): Unit = {
val cql = s"CREATE KEYSPACE IF NOT EXISTS ${table.keyspaceCQL}" +
s" WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor':${replicationFactor}};"
session.execute(cql)
}
/** Create the table if not present
*/
def createTableIfNotExists(): Unit = {
createT(table.nameCQL, table.columns)
}
/** Get an instance of the table
*
* @tparam A Type of the records stored in the table
*
* @return Table instance
*/
def getTable[A <: ToCassandra](): Table[A] = {
Table[A](session, table.keyspaceCQL, table.nameCQL)
}
/** Parse the hostname and extract host + port
*
* @return host and port tuple
*/
private[this] def extractHostPort(): (String, Int) = {
val tokens = contactPoint.split(":")
val addr = tokens(0)
val port = if (tokens.size == 2)
tokens(1).toInt
else
9042
(addr, port)
}
/** Generate CQL to create table using column names and index definitions
*
* @param tableName Table name
* @param columns Columns
*/
private[this] def createT(tableName: String, columns: Seq[Column]): Unit = {
val columnsSql = columns.foldLeft("")((b, a) => s"$b\n${a.name} ${ColumnType.toString(a.`type`)},")
val indexesSql = columns.filter(_.index).map(_.name).mkString("PRIMARY KEY(", ", ", ")")
val createTable = s"CREATE TABLE IF NOT EXISTS ${table.keyspaceCQL}.$tableName($columnsSql $indexesSql)"
session.execute(createTable)
}
}

Просмотреть файл

@ -0,0 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib
/** Trait to be implemented by records to be stored into Cassandra
*/
private[iothubreact] trait ToCassandra {
def toJsonValues: String
}

Просмотреть файл

@ -0,0 +1,61 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib
import com.datastax.driver.core.Session
import org.json4s.JsonAST.JObject
import org.json4s.JsonDSL._
import scala.collection.JavaConverters._
import scala.language.{implicitConversions, postfixOps}
/** CQL table methods
*
* @param session Cassandra session
* @param keyspace Key space
* @param tableName Table name
* @tparam T Record type
*/
private[iothubreact] case class Table[T <: ToCassandra](session: Session, keyspace: String, tableName: String) {
/** Insert a record (upsert)
*
* @param record Record data
*/
def insertRow(record: T) = {
session.execute(s"INSERT INTO $keyspace.$tableName JSON '${record.toJsonValues}'")
}
/** Update a record (upsert)
*
* @param record Record data
*/
def updateRow(record: T): Unit = {
session.execute(s"INSERT INTO $keyspace.$tableName JSON '${record.toJsonValues}'")
}
/** Retrieve a record
*
* @todo return a T object
*
* @param condition CQL condition
*
* @return a record as string
*/
def select(condition: String): JObject = {
val row = session.execute(s"SELECT * FROM $keyspace.$tableName WHERE ${condition}").one()
var partition = -1
var offset = ""
if (row != null) {
row.getColumnDefinitions.asScala.foreach(definition {
val fieldName = definition.getName
if (fieldName == "partition") partition = row.getInt(fieldName)
if (fieldName == "offset") offset = row.getString(fieldName)
})
}
("partition" -> partition) ~ ("offset" -> offset)
}
}

Просмотреть файл

@ -0,0 +1,16 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib
/** Interface to be implemented to define the schema of the table
*/
private[iothubreact] trait TableSchema {
protected[this] val keyspace: String
protected[this] val name : String
val columns: Seq[Column]
// Hide the original values and expose the filtered ones
// https://docs.datastax.com/en/cql/3.3/cql/cql_reference/ref-lexical-valid-chars.html
lazy val keyspaceCQL = this.keyspace.replaceAll("[^A-Za-z0-9_]", "_")
lazy val nameCQL = this.name.replaceAll("[^A-Za-z0-9_]", "_")
}

Просмотреть файл

@ -0,0 +1,17 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.filters
import com.microsoft.azure.iot.iothubreact.IoTMessage
/** Set of filters to ignore IoT traffic
*
*/
private[iothubreact] object Ignore {
/** Ignore the keep alive signal injected by IoTMessageSource
*
* @return True if the message must be processed
*/
def keepAlive = (m: IoTMessage) !m.isKeepAlive
}

Просмотреть файл

@ -0,0 +1,13 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.filters
import com.microsoft.azure.iot.iothubreact.IoTMessage
object Model {
def apply(model: String)(m: IoTMessage) = new Model(model).only(m)
}
class Model(val model: String) {
def only(m: IoTMessage): Boolean = m.model == model
}

Просмотреть файл

@ -6,59 +6,19 @@ import java.time.Instant
import akka.NotUsed
import akka.stream.javadsl.{Source SourceJavaDSL}
import com.microsoft.azure.eventhubs.PartitionReceiver
import com.microsoft.azure.iot.iothubreact.IoTMessage
import com.microsoft.azure.iot.iothubreact.{IoTMessage, Offset}
import com.microsoft.azure.iot.iothubreact.scaladsl.{IoTHub IoTHubScalaDSL}
import scala.collection.JavaConverters._
/** Provides a streaming source to retrieve messages from Azure IoT Hub
*
* @todo Support reading the same partition from multiple clients
* @todo (*) Provide ClearCheckpoints() method to clear the state
*/
class IoTHub() extends {
// Offset used to start reading from the beginning
val OffsetStartOfStream: String = PartitionReceiver.START_OF_STREAM
class IoTHub() {
private lazy val iotHub = new IoTHubScalaDSL()
/** Stream returning all the messages since the beginning, from the specified
* partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
*
* @return A source of IoT messages
*/
def source(partition: Int): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(partition))
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
* @param offset Starting position, offset of the first message
*
* @return A source of IoT messages
*/
def source(partition: Int, offset: String): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(partition, offset))
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
* @param startTime Starting position expressed in time
*
* @return A source of IoT messages
*/
def source(partition: Int, startTime: Instant): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(partition, startTime))
}
/** Stream returning all the messages since the beginning, from all the
* configured partitions.
*
@ -68,17 +28,6 @@ class IoTHub() extends {
new SourceJavaDSL(iotHub.source())
}
/** Stream returning all the messages starting from the given offset, from all
* the configured partitions.
*
* @param offset Starting position
*
* @return A source of IoT messages
*/
def source(offset: String): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(offset))
}
/** Stream returning all the messages starting from the given time, from all
* the configured partitions.
*
@ -89,4 +38,51 @@ class IoTHub() extends {
def source(startTime: Instant): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(startTime))
}
/** Stream returning all the messages from all the configured partitions.
* If checkpointing the stream starts from the last position saved, otherwise
* it starts from the beginning.
*
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(withCheckpoints: Boolean): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(withCheckpoints))
}
/** Stream returning all the messages starting from the given offset, from all
* the configured partitions.
*
* @param offsets Starting position for all the partitions
*
* @return A source of IoT messages
*/
def source(offsets: java.util.Collection[Offset]): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(offsets.asScala.toList))
}
/** Stream returning all the messages starting from the given time, from all
* the configured partitions.
*
* @param startTime Starting position expressed in time
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(startTime: Instant, withCheckpoints: Boolean): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(startTime, withCheckpoints))
}
/** Stream returning all the messages starting from the given offset, from all
* the configured partitions.
*
* @param offsets Starting position for all the partitions
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(offsets: java.util.Collection[Offset], withCheckpoints: Boolean): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHub.source(offsets.asScala.toList, withCheckpoints))
}
}

Просмотреть файл

@ -0,0 +1,94 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.javadsl
import java.time.Instant
import akka.NotUsed
import akka.stream.javadsl.{Source SourceJavaDSL}
import com.microsoft.azure.iot.iothubreact.{IoTMessage, Offset}
import com.microsoft.azure.iot.iothubreact.scaladsl.{IoTHubPartition IoTHubPartitionScalaDSL}
/** Provides a streaming source to retrieve messages from one Azure IoT Hub partition
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
*
* @todo (*) Provide ClearCheckpoints() method to clear the state
* @todo Support reading the same partition from multiple clients
*/
class IoTHubPartition(val partition: Int) {
// Offset used to start reading from the beginning
final val OffsetStartOfStream: String = IoTHubPartitionScalaDSL.OffsetStartOfStream
// Public constant: used internally to signal when there is no position saved in the storage
// To be used by custom backend implementations
final val OffsetCheckpointNotFound: String = IoTHubPartitionScalaDSL.OffsetCheckpointNotFound
private lazy val iotHubPartition = new IoTHubPartitionScalaDSL(partition)
/** Stream returning all the messages since the beginning, from the specified
* partition.
*
* @return A source of IoT messages
*/
def source(): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHubPartition.source())
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param startTime Starting position expressed in time
*
* @return A source of IoT messages
*/
def source(startTime: Instant): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHubPartition.source(startTime))
}
/** Stream returning all the messages. If checkpointing, the stream starts from the last position
* saved, otherwise it starts from the beginning.
*
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(withCheckpoints: Boolean): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHubPartition.source(withCheckpoints))
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param offset Starting position, offset of the first message
*
* @return A source of IoT messages
*/
def source(offset: Offset): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHubPartition.source(offset))
}
/** Stream returning all the messages from the given offset
*
* @param startTime Starting position expressed in time
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(startTime: Instant, withCheckpoints: Boolean): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHubPartition.source(startTime, withCheckpoints))
}
/** Stream returning all the messages from the given offset
*
* @param offset Starting position, offset of the first message
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(offset: Offset, withCheckpoints: Boolean): SourceJavaDSL[IoTMessage, NotUsed] = {
new SourceJavaDSL(iotHubPartition.source(offset, withCheckpoints))
}
}

Просмотреть файл

@ -7,72 +7,35 @@ import java.time.Instant
import akka.NotUsed
import akka.stream.SourceShape
import akka.stream.scaladsl._
import com.microsoft.azure.eventhubs.PartitionReceiver
import com.microsoft.azure.iot.iothubreact._
import com.microsoft.azure.iot.iothubreact.checkpointing.{Configuration CPConfiguration}
import scala.language.postfixOps
object IoTHub {
def apply(): IoTHub = new IoTHub
}
/** Provides a streaming source to retrieve messages from Azure IoT Hub
*
* @todo Support reading the same partition from multiple clients
* @todo (*) Provide ClearCheckpoints() method to clear the state
*/
class IoTHub {
class IoTHub extends Logger {
// Offset used to start reading from the beginning
val OffsetStartOfStream: String = PartitionReceiver.START_OF_STREAM
private[this] def fromStart =
Some(List.fill[Offset](Configuration.iotHubPartitions)(Offset(IoTHubPartition.OffsetStartOfStream)))
/** Stream returning all the messages since the beginning, from the specified
* partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
/** Stream returning all the messages from all the configured partitions.
* If checkpointing the stream starts from the last position saved, otherwise
* it starts from the beginning.
*
* @return A source of IoT messages
*/
def source(partition: Int): Source[IoTMessage, NotUsed] = {
IoTMessageSource(partition, OffsetStartOfStream)
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
* @param offset Starting position, offset of the first message
*
* @return A source of IoT messages
*/
def source(partition: Int, offset: String): Source[IoTMessage, NotUsed] = {
IoTMessageSource(partition, offset)
}
/** Stream returning all the messages from the given offset, from the
* specified partition.
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
* @param startTime Starting position expressed in time
*
* @return A source of IoT messages
*/
def source(partition: Int, startTime: Instant): Source[IoTMessage, NotUsed] = {
IoTMessageSource(partition, startTime)
}
/** Stream returning all the messages since the beginning, from all the
* configured partitions.
*
* @return A source of IoT messages
*/
def source(): Source[IoTMessage, NotUsed] = source(OffsetStartOfStream)
/** Stream returning all the messages starting from the given offset, from all
* the configured partitions.
*
* @param offset Starting position
*
* @return A source of IoT messages
*/
def source(offset: String): Source[IoTMessage, NotUsed] = {
source(offset, Instant.MIN, false)
def source(): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = false,
offsets = fromStart,
withCheckpoints = false)
}
/** Stream returning all the messages starting from the given time, from all
@ -83,21 +46,86 @@ class IoTHub {
* @return A source of IoT messages
*/
def source(startTime: Instant): Source[IoTMessage, NotUsed] = {
source("", startTime, true)
getSource(
withTimeOffset = true,
startTime = startTime,
withCheckpoints = false)
}
/** Stream returning all the messages, from the given starting point
/** Stream returning all the messages from all the configured partitions.
* If checkpointing the stream starts from the last position saved, otherwise
* it starts from the beginning.
*
* @param offset Starting position using the offset property in the messages
* @param startTime Starting position expressed in time
* @param timeOffset Whether the start point is a timestamp
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
private[this] def source(
offset: String,
startTime: Instant,
timeOffset: Boolean): Source[IoTMessage, NotUsed] = {
def source(withCheckpoints: Boolean): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = false,
offsets = fromStart,
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
}
/** Stream returning all the messages starting from the given offset, from all
* the configured partitions.
*
* @param offsets Starting position for all the partitions
*
* @return A source of IoT messages
*/
def source(offsets: List[Offset]): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = false,
offsets = Some(offsets),
withCheckpoints = false)
}
/** Stream returning all the messages starting from the given time, from all
* the configured partitions.
*
* @param startTime Starting position expressed in time
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(startTime: Instant, withCheckpoints: Boolean): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = true,
startTime = startTime,
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
}
/** Stream returning all the messages starting from the given offset, from all
* the configured partitions.
*
* @param offsets Starting position for all the partitions
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(offsets: List[Offset], withCheckpoints: Boolean): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = false,
offsets = Some(offsets),
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
}
/** Stream returning all the messages, from the given starting point, optionally with
* checkpointing
*
* @param offsets Starting positions using the offset property in the messages
* @param startTime Starting position expressed in time
* @param withTimeOffset Whether the start point is a timestamp
* @param withCheckpoints Whether to read/write the stream position
*
* @return A source of IoT messages
*/
private[this] def getSource(
offsets: Option[List[Offset]] = None,
startTime: Instant = Instant.MIN,
withTimeOffset: Boolean = false,
withCheckpoints: Boolean = true): Source[IoTMessage, NotUsed] = {
val graph = GraphDSL.create() {
implicit b
@ -105,8 +133,12 @@ class IoTHub {
val merge = b.add(Merge[IoTMessage](Configuration.iotHubPartitions))
for (p 0 until Configuration.iotHubPartitions) {
val graph = if (timeOffset) IoTMessageSource(p, startTime) else IoTMessageSource(p, offset)
for (partition 0 until Configuration.iotHubPartitions) {
val graph = if (withTimeOffset)
IoTHubPartition(partition).source(startTime, withCheckpoints)
else
IoTHubPartition(partition).source(offsets.get(partition), withCheckpoints)
val source = Source.fromGraph(graph).async
source ~> merge
}

Просмотреть файл

@ -0,0 +1,197 @@
// Copyright (c) Microsoft. All rights reserved.
package com.microsoft.azure.iot.iothubreact.scaladsl
import java.time.Instant
import akka.NotUsed
import akka.pattern.ask
import akka.stream.scaladsl.Source
import akka.util.Timeout
import com.microsoft.azure.eventhubs.PartitionReceiver
import com.microsoft.azure.iot.iothubreact._
import com.microsoft.azure.iot.iothubreact.checkpointing.CheckpointService.GetOffset
import com.microsoft.azure.iot.iothubreact.checkpointing.{CheckpointActorSystem, SavePositionOnPull, Configuration CPConfiguration}
import com.microsoft.azure.iot.iothubreact.filters.Ignore
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
object IoTHubPartition extends Logger {
// Public constant: offset position used to start reading from the beginning
final val OffsetStartOfStream: String = PartitionReceiver.START_OF_STREAM
// Public constant: used internally to signal when there is no position saved in the storage
// To be used by custom backend implementations
final val OffsetCheckpointNotFound: String = "{offset checkpoint not found}"
/** Create a streaming source to retrieve messages from one Azure IoT Hub partition
*
* @param partition IoT hub partition number
*
* @return IoT hub instance
*/
def apply(partition: Int): IoTHubPartition = new IoTHubPartition(partition)
}
/** Provide a streaming source to retrieve messages from one Azure IoT Hub partition
*
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
*
* @todo (*) Provide ClearCheckpoints() method to clear the state
* @todo Support reading the same partition from multiple clients
*/
class IoTHubPartition(val partition: Int) extends Logger {
/** Stream returning all the messages. If checkpointing is enabled in the global configuration
* then the stream starts from the last position saved, otherwise it starts from the beginning.
*
* @return A source of IoT messages
*/
def source(): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = false,
offset = Offset(IoTHubPartition.OffsetStartOfStream),
withCheckpoints = false)
}
/** Stream returning all the messages from the given offset
*
* @param startTime Starting position expressed in time
*
* @return A source of IoT messages
*/
def source(startTime: Instant): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = true,
startTime = startTime,
withCheckpoints = false)
}
/** Stream returning all the messages. If checkpointing, the stream starts from the last position
* saved, otherwise it starts from the beginning.
*
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(withCheckpoints: Boolean): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = false,
offset = Offset(IoTHubPartition.OffsetStartOfStream),
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
}
/** Stream returning all the messages from the given offset
*
* @param offset Starting position, offset of the first message
*
* @return A source of IoT messages
*/
def source(offset: Offset): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = false,
offset = offset,
withCheckpoints = false)
}
/** Stream returning all the messages from the given offset
*
* @param startTime Starting position expressed in time
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(startTime: Instant, withCheckpoints: Boolean): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = true,
startTime = startTime,
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
}
/** Stream returning all the messages from the given offset
*
* @param offset Starting position, offset of the first message
* @param withCheckpoints Whether to read/write the stream position (default: true)
*
* @return A source of IoT messages
*/
def source(offset: Offset, withCheckpoints: Boolean): Source[IoTMessage, NotUsed] = {
getSource(
withTimeOffset = false,
offset = offset,
withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
}
/** Create a stream returning all the messages for the defined partition, from the given start
* point, optionally with checkpointing
*
* @param withTimeOffset Whether the start point is a timestamp
* @param offset Starting position using the offset property in the messages
* @param startTime Starting position expressed in time
* @param withCheckpoints Whether to read/write the stream position
*
* @return A source of IoT messages
*/
private[this] def getSource(
withTimeOffset: Boolean,
offset: Offset = Offset(""),
startTime: Instant = Instant.MIN,
withCheckpoints: Boolean = true): Source[IoTMessage, NotUsed] = {
// Load the offset from the storage (if needed)
var _offset = offset.value
var _withTimeOffset = withTimeOffset
if (withCheckpoints) {
val savedOffset = GetSavedOffset()
if (savedOffset != IoTHubPartition.OffsetCheckpointNotFound) {
_offset = savedOffset
_withTimeOffset = false
log.info(s"Starting partition ${partition} from saved offset ${_offset}")
}
}
// Build the source starting by time or by offset
val source: Source[IoTMessage, NotUsed] = if (_withTimeOffset)
IoTMessageSource(partition, startTime, withCheckpoints).filter(Ignore.keepAlive)
else
IoTMessageSource(partition, _offset, withCheckpoints).filter(Ignore.keepAlive)
// Inject a flow to store the stream position after each pull
if (withCheckpoints) {
log.debug(s"Adding checkpointing flow to the partition ${partition} stream")
source.via(new SavePositionOnPull(partition))
} else {
source
}
}
/** Get the offset saved for the current partition
*
* @return Offset
*/
private[this] def GetSavedOffset(): String = {
val partitionCp = CheckpointActorSystem.getCheckpointService(partition)
implicit val rwTimeout = Timeout(CPConfiguration.checkpointRWTimeout)
try {
Retry(3, 5 seconds) {
log.debug(s"Loading the stream position for partition ${partition}")
val future = (partitionCp ? GetOffset).mapTo[String]
Await.result(future, rwTimeout.duration)
}
} catch {
case e: java.util.concurrent.TimeoutException => {
log.error(e, "Timeout while retrieving the offset from the storage")
throw e
}
case e: Exception {
log.error(e, e.getMessage)
throw e
}
}
}
}

Просмотреть файл

@ -5,9 +5,36 @@ iothub {
keyName = ${?IOTHUB_CI_ACCESS_KEY0_NAME}
key = ${?IOTHUB_CI_ACCESS_KEY0_VALUE}
devices = ${?IOTHUB_CI_DEVICES_JSON_FILE}
}
iothub-stream {
receiverBatchSize = 3
receiverTimeout = 3s
receiverTimeout = 5s
}
iothub-checkpointing {
enabled = true
frequency = 15s
countThreshold = 2000
timeThreshold = 5min
storage {
rwTimeout = 6s
backendType = "AzureBlob"
namespace = "iothub-react-checkpoints"
azureblob {
lease = 15s
useEmulator = false
protocol = "https"
account = ${?IOTHUB_CHECKPOINT_ACCOUNT}
key = ${?IOTHUB_CHECKPOINT_KEY}
}
cassandra {
cluster = "localhost:9042"
replicationFactor = 1
}
}
}
akka {

Просмотреть файл

@ -0,0 +1,46 @@
[
{
"deviceId": "device10000",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10001",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10002",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10003",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10004",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10005",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10006",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10007",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10008",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10009",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10010",
"primaryKey": "...your device key..."
}
]

Просмотреть файл

@ -9,14 +9,14 @@ import akka.actor.Props
import akka.pattern.ask
import akka.stream.scaladsl.{Sink, Source}
import com.microsoft.azure.iot.iothubreact.IoTMessage
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHub
import com.microsoft.azure.iot.iothubreact.test.helpers.{Counter, Device, Logger, ReactiveStreaming}
import com.microsoft.azure.iot.iothubreact.scaladsl.{IoTHub, IoTHubPartition}
import com.microsoft.azure.iot.iothubreact.test.helpers._
import org.scalatest._
import resource._
import scala.collection.parallel.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
/** Tests streaming against Azure IoT hub endpoint
*
@ -44,12 +44,13 @@ class IoTHubReactiveStreamingUserStory
scenario("Developer wants to retrieve IoT messages") {
Given("An IoT hub is configured")
val hub = new IoTHub()
val hub = IoTHub()
val hubPartition = IoTHubPartition(1)
When("A developer wants to fetch messages from Azure IoT hub")
val messagesFromOnePartition: Source[IoTMessage, NotUsed] = hub.source(1)
val messagesFromAllPartitions: Source[IoTMessage, NotUsed] = hub.source
val messagesFromNowOn: Source[IoTMessage, NotUsed] = hub.source(Instant.now())
val messagesFromOnePartition: Source[IoTMessage, NotUsed] = hubPartition.source(false)
val messagesFromAllPartitions: Source[IoTMessage, NotUsed] = hub.source(false)
val messagesFromNowOn: Source[IoTMessage, NotUsed] = hub.source(Instant.now(), false)
Then("The messages are presented as a stream")
messagesFromOnePartition.to(Sink.ignore)
@ -61,38 +62,27 @@ class IoTHubReactiveStreamingUserStory
// How many seconds we allow the test to wait for messages from the stream
val TestTimeout = 60 seconds
val DevicesCount = 5
val MessagesPerDevice = 4
val expectedMessageCount = DevicesCount * MessagesPerDevice
// A label shared by all the messages, to filter out data sent by other tests
val testRunId: String = "[RetrieveAll-" + java.util.UUID.randomUUID().toString + "]"
// We'll use this as the streaming start date
val startTime = Instant.now()
log.info(s"Test run: ${testRunId}, Start time: ${startTime}")
Given("An IoT hub is configured")
val messages = new IoTHub().source(startTime)
val messages = IoTHub().source(startTime, false)
And("5 devices have sent 4 messages each")
var expectedMessageCount: Long = 0
for {
device1 managed(new Device("device10000"))
device2 managed(new Device("device10001"))
device3 managed(new Device("device10002"))
device4 managed(new Device("device10003"))
device5 managed(new Device("device10004"))
} {
for (i 1 to 4) {
device1.sendMessage(testRunId, i)
device2.sendMessage(testRunId, i)
device3.sendMessage(testRunId, i)
device4.sendMessage(testRunId, i)
device5.sendMessage(testRunId, i)
expectedMessageCount += 5
}
log.info(s"Messages sent: $expectedMessageCount")
And(s"${DevicesCount} devices have sent ${MessagesPerDevice} messages each")
for (i 0 until DevicesCount) {
val device = new Device("device" + (10000 + i))
for (i 1 to MessagesPerDevice) device.sendMessage(testRunId, i)
device.disconnect()
}
log.info(s"Messages sent: $expectedMessageCount")
When("A client application processes messages from the stream")
counter ! "reset"
@ -113,7 +103,7 @@ class IoTHubReactiveStreamingUserStory
Thread.sleep(pause)
time -= pause
actualMessageCount = readCounter
log.info(s"Messages received so far: ${actualMessageCount} [Time left ${time / 1000} secs]")
log.info(s"Messages received so far: ${actualMessageCount} of ${expectedMessageCount} [Time left ${time / 1000} secs]")
}
assert(
@ -121,53 +111,40 @@ class IoTHubReactiveStreamingUserStory
s"Expecting ${expectedMessageCount} messages but received ${actualMessageCount}")
}
// Note: messages are sent in parallel to obtain some level of mix in the
// storage, so do not refactor, i.e. don't do one device at a time.
scenario("Customer needs to process IoT messages in the right order") {
// How many seconds we allow the test to wait for messages from the stream
val TestTimeout = 120 seconds
val DevicesCount = 10
val MessagesPerDevice = 200
val expectedMessageCount = DevicesCount * MessagesPerDevice
// A label shared by all the messages, to filter out data sent by other tests
val testRunId: String = "[VerifyOrder-" + java.util.UUID.randomUUID().toString + "]"
// We'll use this as the streaming start date
val startTime = Instant.now()
log.info(s"Test run: ${testRunId}, Start time: ${startTime}")
Given("An IoT hub is configured")
val messages = new IoTHub().source(startTime)
val messages = IoTHub().source(startTime, false)
And(s"10 devices have sent ${MessagesPerDevice} messages each")
var expectedMessageCount: Int = 0
for {
device1 managed(new Device("device10000"))
device2 managed(new Device("device10001"))
device3 managed(new Device("device10002"))
device4 managed(new Device("device10003"))
device5 managed(new Device("device10004"))
device6 managed(new Device("device10005"))
device7 managed(new Device("device10006"))
device8 managed(new Device("device10007"))
device9 managed(new Device("device10008"))
device10 managed(new Device("device10009"))
} {
for (i 1 to MessagesPerDevice) {
device1.sendMessage(testRunId, i)
device2.sendMessage(testRunId, i)
device3.sendMessage(testRunId, i)
device4.sendMessage(testRunId, i)
device5.sendMessage(testRunId, i)
device6.sendMessage(testRunId, i)
device7.sendMessage(testRunId, i)
device8.sendMessage(testRunId, i)
device9.sendMessage(testRunId, i)
device10.sendMessage(testRunId, i)
expectedMessageCount += 10
}
And(s"${DevicesCount} devices have sent ${MessagesPerDevice} messages each")
val devices = new collection.mutable.ListMap[Int, Device]()
log.info(s"Messages sent: $expectedMessageCount")
}
for (i 0 until DevicesCount)
devices(i) = new Device("device" + (10000 + i))
for (i 1 to MessagesPerDevice)
for (i 0 until DevicesCount)
devices(i).sendMessage(testRunId, i)
for (i 0 until DevicesCount)
devices(i).disconnect()
log.info(s"Messages sent: $expectedMessageCount")
When("A client application processes messages from the stream")
@ -198,13 +175,13 @@ class IoTHubReactiveStreamingUserStory
// Wait till all messages have been verified
var time = TestTimeout.toMillis.toInt
val pause = time / 10
val pause = time / 12
var actualMessageCount = readCounter
while (time > 0 && actualMessageCount < expectedMessageCount) {
Thread.sleep(pause)
time -= pause
actualMessageCount = readCounter
log.info(s"Messages received so far: ${actualMessageCount} [Time left ${time / 1000} secs]")
log.info(s"Messages received so far: ${actualMessageCount} of ${expectedMessageCount} [Time left ${time / 1000} secs]")
}
assert(

Просмотреть файл

@ -2,6 +2,8 @@
package com.microsoft.azure.iot.iothubreact.test.helpers
import java.nio.file.{Files, Paths}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.microsoft.azure.eventhubs.EventHubClient
@ -23,14 +25,19 @@ private object Configuration {
// Tests can override these
var iotReceiverConsumerGroup: String = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME
var receiverTimeout : Long = conf.getDuration("iothub.receiverTimeout").toMillis
var receiverBatchSize : Int = conf.getInt("iothub.receiverBatchSize")
var receiverTimeout : Long = conf.getDuration("iothub-stream.receiverTimeout").toMillis
var receiverBatchSize : Int = conf.getInt("iothub-stream.receiverBatchSize")
// Read devices configuration from JSON file
private[this] val jsonParser = new ObjectMapper()
jsonParser.registerModule(DefaultScalaModule)
private[this] lazy val devicesJson = File(conf.getString("iothub.devices")).slurp()
private[this] lazy val devices = jsonParser.readValue(devicesJson, classOf[Array[DeviceCredentials]])
private[this] lazy val devicesJsonFile = conf.getString("iothub.devices")
private[this] lazy val devicesJson = File(devicesJsonFile).slurp()
private[this] lazy val devices = jsonParser.readValue(devicesJson, classOf[Array[DeviceCredentials]])
def deviceCredentials(id: String): DeviceCredentials = devices.find(x x.deviceId == id).get
if (!Files.exists(Paths.get(devicesJsonFile))) {
throw new RuntimeException("Devices credentials not found")
}
}

Просмотреть файл

@ -3,19 +3,9 @@
package com.microsoft.azure.iot.iothubreact.test.helpers
import com.microsoft.azure.iothub._
import resource.Resource
/* Companion object required by `Resource` for type inference */
object Device {
implicit object TypeClassResourceHelper extends Resource[Device] {
override def close(r: Device): Unit = r.disconnect()
}
}
/* Test helper to send messages to the hub */
class Device(deviceId: String) extends Resource[Device] with Logger {
class Device(deviceId: String) extends Logger {
private[this] class EventCallback extends IotHubEventCallback {
override def execute(status: IotHubStatusCode, context: scala.Any): Unit = {
@ -34,14 +24,6 @@ class Device(deviceId: String) extends Resource[Device] with Logger {
// Prepare client to send messages
private[this] lazy val client = new DeviceClient(connString, IotHubClientProtocol.AMQPS)
/** Note: automatically invoked by Resource[A]
*
* @param resource Resource being disposed
*/
override def close(resource: Device): Unit = {
resource.disconnect()
}
def disconnect(): Unit = {
client.close()
}

Просмотреть файл

@ -0,0 +1,65 @@
# Device simulator
The program simulates some Temperature and Humidity sensors. The values gradually
increase and decrease, within a range of 6 minutes, with random peaks.
# How to run the device simulator
First of all you should create an Azure IoT hub instance. You can create a free instance
selecting the "F1 Free" scale tier.
Once you have an IoT hub ready, you should take note of:
* the **connection string** from the **Shared access policies** panel, for
the **iothubowner** policy.
## Create the devices
Creating devices is vey simple, thanks to the IoT hub explorer, which you can run from
the command line.
From here on you should use a terminal with Node.js and Node Package Manager (npm) ready to use.
To install IoT hub explorer, open a terminal and execute
```bash
npm install -g iothub-explorer
```
Once the IoT hub explorer is installed, proceed to create the devices:
* **Login, using the connection string obtained earlier:**
```bash
CONNSTRING="... iothubowner connection string ..."
iothub-explorer login '$CONNSTRING'
```
* **Create some devices**, e.g. using using a Bash terminal:
```bash
for i in {1000..1010}; do iothub-explorer create device$i --display="deviceId"; done
```
## Prepare the simulator
In order to run, the simulator needs the credentials for each device just created.
The following command creates a `credentials.js` file with the settings required.
From the terminal, 'cd' into the same folder of this README document, and execute:
```bash
export CONNSTRING="... iothubowner connection string ..."
./download_credentials.sh
unset CONNSTRING
```
## Run the simulator
Open a bash terminal in the same folder of this README document and execute:
```bash
npm run send
```
The simulator will start sending data every second, for each device created.

Просмотреть файл

@ -0,0 +1,111 @@
// Copyright (c) Microsoft. All rights reserved.
"use strict";
var Client = require("azure-iot-device").Client;
var Message = require("azure-iot-device").Message;
var debug = false;
/**
@abstract
@constructor
*/
function AbstractSimulator(hubName, deviceId, accessKey, protocol, frequency) {
if (this.constructor === AbstractSimulator) {
throw new Error("AbstractSimulator is an abstract class, cannot create instance");
}
this.name = deviceId;
this.connectionString = "HostName=" + hubName + ".azure-devices.net;DeviceId=" + deviceId + ";SharedAccessKey=" + accessKey;
this.protocol = protocol;
this.frequency = frequency;
this.connectionStatus = "disconnected";
this.clock = null;
this.model = "";
}
/**
@abstract
*/
AbstractSimulator.prototype.generateData = function () {
throw new Error("abstract method 'generateMessage' not implemented. Must return a <JSON string> with the data (e.g. using JSON.stringify).");
};
AbstractSimulator.prototype.getConnectCallback = function () {
var self = this;
return function (err) {
if (err) {
self.connectionStatus = "failed";
console.error("[" + self.name + "] Could not connect: " + err.message);
} else {
if (self.connectionStatus !== "connecting") {
return;
}
self.connectionStatus = "connected";
console.log("[" + self.name + "] Client connected");
self.client.on("message", function (msg) {
console.log("[" + self.name + "] Id: " + msg.messageId + " Body: " + msg.data);
self.client.complete(msg, self.printResultFor("completed"));
});
self.client.on("error", function (err) {
console.error("[" + self.name + "] " + err.message);
});
self.client.on("disconnect", function () {
clearInterval(self.clock);
self.connectionStatus = "disconnected";
console.log("[" + self.name + "] Disconnected.");
self.client.removeAllListeners();
self.client.open(this.getConnectCallback());
});
}
};
};
AbstractSimulator.prototype.connect = function () {
this.connectionStatus = "connecting";
this.client = Client.fromConnectionString(this.connectionString, this.protocol);
this.client.open(this.getConnectCallback());
};
AbstractSimulator.prototype.startSending = function () {
var self = this;
this.clock = setInterval(function () {
// Verify if the client is connected yet
if (self.connectionStatus === "connecting") {
console.error("[" + self.name + "] The client is not connected yet... [" + self.connectionStatus + "]");
return;
}
if (self.connectionStatus !== "connected") {
console.error("[" + self.name + "] The client could not connect [" + self.connectionStatus + "]");
return;
}
var message = new Message(self.generateData());
if (self.model !== "") {
message.properties.add("model", self.model);
console.log("[" + self.name + "] Sending " + self.model + ": " + message.getData());
} else {
console.log("[" + self.name + "] Sending message: " + message.getData());
}
self.client.sendEvent(message, self.printResultFor("send", self.name));
}, self.frequency);
};
AbstractSimulator.prototype.printResultFor = function (op, name) {
return function printResult(err, res) {
if (err) {
console.log("[" + name + "] " + op + " error: " + err.toString());
}
if (res) {
if (debug) console.log("[" + name + "] " + op + " status: " + res.constructor.name);
}
};
};
module.exports = AbstractSimulator;

Просмотреть файл

@ -0,0 +1,5 @@
#!/usr/bin/env bash
DATA=$(iothub-explorer list --display="deviceId,authentication.SymmetricKey.primaryKey," --raw|sort|awk '{ print $0 ","}')
echo "var connString = '$CONNSTRING';" > credentials.js
echo "var hubDevices = [$DATA];" >> credentials.js

Просмотреть файл

@ -0,0 +1,48 @@
// Copyright (c) Microsoft. All rights reserved.
"use strict";
var AbstractSimulator = require("./abstract_simulator.js");
// Inheritance
var HumiditySimulator = function () {
AbstractSimulator.apply(this, arguments);
this.model = "humidity";
// (70-30) .. (70+30) => 40 .. 100
this.mid = 70;
this.range = 30;
this.peak = this.range * Math.random();
};
HumiditySimulator.prototype = Object.create(AbstractSimulator.prototype);
HumiditySimulator.prototype.constructor = HumiditySimulator;
// Implement generateData abstract
HumiditySimulator.prototype.generateData = function () {
var d = new Date();
var h = d.getHours();
var m = d.getMinutes();
var s = d.getSeconds();
var sec = ((((h * 60) + m) * 60 + s) + 180) % 360;
// Trend changes when Math.sin() = 0
if (sec === 0 || sec === 180) {
this.peak = this.range * Math.random();
}
// Sinusoidal values
var rad = sec * Math.PI / 180;
var measure = this.mid + Math.sin(rad) * this.peak;
// Truncate after 1st decimal
measure = Math.floor(measure * 10) / 10;
return JSON.stringify({
value: measure,
time: new Date().toISOString()
});
};
module.exports = HumiditySimulator;

Просмотреть файл

@ -0,0 +1,16 @@
{
"name": "iothub-react-temperature-simulator",
"version": "0.0.1",
"private": true,
"description": "Simple simulator sending Temperature data to Azure IoT Hub",
"author": "Microsoft Corporation",
"license": "MIT",
"dependencies": {
"azure-iot-device": "1.0.15",
"azure-iot-device-amqp": "1.0.16",
"azure-iot-device-http": "1.0.16"
},
"scripts": {
"send": "node send.js"
}
}

Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft. All rights reserved.
"use strict";
// How frequently to send data
var frequency = 1000;
// Available protocols
//var Protocol = require("azure-iot-device-amqp").Amqp;
var Protocol = require("azure-iot-device-http").Http;
// Load credentials
require("vm").runInThisContext(require("fs").readFileSync(__dirname + "/credentials.js"));
var hubName = connString.substring(connString.indexOf("=") + 1, connString.indexOf(".azure-devices.net"));
// Instantiate simulators
var TemperatureSimulator = require("./temperature_simulator.js");
var HumiditySimulator = require("./humidity_simulator.js");
// Connect simulators to Azure IoT hub
var devices = [];
var j = 0;
for (var i = 0; i < hubDevices.length ; i++) {
var deviceId = hubDevices[i][0].deviceId;
var accessKey = hubDevices[i][0].authentication.SymmetricKey.primaryKey;
devices[j] = new TemperatureSimulator(hubName, deviceId, accessKey, Protocol, frequency);
devices[j++].connect();
devices[j] = new HumiditySimulator(hubName, deviceId, accessKey, Protocol, frequency);
devices[j++].connect();
}
// Start sending data generated by the simulators
for (var i = 0; i < devices.length ; i++) {
devices[i].startSending();
}

Просмотреть файл

@ -0,0 +1,51 @@
// Copyright (c) Microsoft. All rights reserved.
"use strict";
var AbstractSimulator = require("./abstract_simulator.js");
// Inheritance
var TemperatureSimulator = function () {
AbstractSimulator.apply(this, arguments);
this.model = "temperature";
// (15-25) .. (15+25) => -10 .. 40
this.mid = 15;
this.range = 25;
this.peak = this.range * Math.random();
};
TemperatureSimulator.prototype = Object.create(AbstractSimulator.prototype);
TemperatureSimulator.prototype.constructor = TemperatureSimulator;
// Implement generateData abstract
TemperatureSimulator.prototype.generateData = function () {
var d = new Date();
var h = d.getHours();
var m = d.getMinutes();
var s = d.getSeconds();
var sec = (((h * 60) + m) * 60 + s) % 360;
// Trend changes when Math.sin() = 0
if (sec === 0 || sec === 180) {
this.peak = this.range * Math.random();
}
// Sinusoidal values
var rad = sec * Math.PI / 180;
var measure = this.mid + Math.sin(rad) * this.peak;
// Truncate after 1st decimal
measure = Math.floor(measure * 10) / 10;
// Convert to Fahrenheit
// measure = measure * 1.8 + 32
return JSON.stringify({
value: measure,
time: new Date().toISOString()
});
};
module.exports = TemperatureSimulator;