From 0b393e643ec58d61f19fa8fbfd45374bba36b959 Mon Sep 17 00:00:00 2001 From: Devis Lucato Date: Wed, 26 Oct 2016 16:13:10 -0700 Subject: [PATCH] 0.7.0 release (#8) * Add checkpointing feature, saving the current position of the streams, and automatically restart from the previous psition. The position can be saved either in Azure blobs or Cassandra. * Support filtering the stream by message type (aka "model"), so that a device can send different kind of telemetry. * Keep alive streams in absence of traffic * Add device simulator app to simulate temperature and humidity sensors * Add support for Scala 2.12.0-RC1 * Overall refactoring to reduce the amount of code on the client side * Retry logic on storage operations * Add Travis CI config * Improve documentation * Remove dependency on scala-arm * Start move to json4s * Change code style to 100 columns * Add script to run the sample demos * API changes, split IoTHub in two classes --- .gitignore | 5 + .travis.yml | 16 ++ CHECKPOINTING.md | 88 ++++++++ Codestyle.IntelliJ.xml | 42 ++-- README.md | 213 +++++++++++++----- build.sbt | 58 +++-- devices.json.enc | Bin 0 -> 976 bytes samples-java/pom.xml | 15 +- samples-java/run_samples.cmd | 2 + samples-java/run_samples.sh | 2 + .../src/main/java/DisplayMessages/Demo.java | 46 +++- .../java/DisplayMessages/Temperature.java | 8 + .../src/main/resources/application.conf | 52 +++++ samples-scala/build.sbt | 17 +- samples-scala/run_samples.cmd | 1 + samples-scala/run_samples.sh | 1 + .../src/main/resources/application.conf | 52 +++++ .../A_OutputMessagesToConsole/Demo.scala | 45 ++++ .../ISO8601DateTime.scala} | 26 +-- .../Temperature.scala | 17 ++ .../Demo.scala | 13 +- .../Monitoring.scala | 3 +- .../src/main/scala/C_Checkpoints/Demo.scala | 29 +++ .../ReactiveStreaming.scala | 21 -- .../scala/OutputMessagesToConsole/Demo.scala | 44 ---- .../ReactiveStreaming.scala | 21 -- src/main/resources/reference.conf | 54 ++++- .../azure/iot/iothubreact/Configuration.scala | 19 +- .../azure/iot/iothubreact/IoTHubStorage.scala | 7 +- .../azure/iot/iothubreact/IoTMessage.scala | 20 +- .../iot/iothubreact/IoTMessageSource.scala | 169 ++++++++------ .../azure/iot/iothubreact/Logger.scala | 2 +- .../azure/iot/iothubreact/Offset.scala | 15 ++ .../azure/iot/iothubreact/ResumeOnError.scala | 28 +++ .../azure/iot/iothubreact/Retry.scala | 42 ++++ .../azure/iot/iothubreact/StopOnError.scala | 26 +++ .../checkpointing/Backends/AzureBlob.scala | 178 +++++++++++++++ .../Backends/CheckpointBackend.scala | 25 ++ .../checkpointing/CheckpointActorSystem.scala | 36 +++ .../checkpointing/CheckpointService.scala | 181 +++++++++++++++ .../checkpointing/Configuration.scala | 156 +++++++++++++ .../checkpointing/SavePositionOnPull.scala | 50 ++++ .../backends/CassandraTable.scala | 47 ++++ .../backends/cassandra/CheckpointRecord.scala | 21 ++ .../cassandra/CheckpointsTableSchema.scala | 23 ++ .../backends/cassandra/lib/Column.scala | 50 ++++ .../backends/cassandra/lib/Connection.scala | 73 ++++++ .../backends/cassandra/lib/Record.scala | 9 + .../backends/cassandra/lib/Table.scala | 61 +++++ .../backends/cassandra/lib/TableSchema.scala | 16 ++ .../iot/iothubreact/filters/Ignore.scala | 17 ++ .../azure/iot/iothubreact/filters/Model.scala | 13 ++ .../iot/iothubreact/javadsl/IoTHub.scala | 108 +++++---- .../iothubreact/javadsl/IoTHubPartition.scala | 94 ++++++++ .../iot/iothubreact/scaladsl/IoTHub.scala | 166 ++++++++------ .../scaladsl/IoTHubPartition.scala | 197 ++++++++++++++++ src/test/resources/application.conf | 29 ++- src/test/resources/devices.json | 46 ++++ .../IoTHubReactiveStreamingUserStory.scala | 101 ++++----- .../test/helpers/Configuration.scala | 15 +- .../iot/iothubreact/test/helpers/Device.scala | 20 +- tools/devices-simulator/README.md | 65 ++++++ tools/devices-simulator/abstract_simulator.js | 111 +++++++++ .../devices-simulator/download_credentials.sh | 5 + tools/devices-simulator/humidity_simulator.js | 48 ++++ tools/devices-simulator/package.json | 16 ++ tools/devices-simulator/send.js | 36 +++ .../temperature_simulator.js | 51 +++++ 68 files changed, 2754 insertions(+), 529 deletions(-) create mode 100644 .travis.yml create mode 100644 CHECKPOINTING.md create mode 100644 devices.json.enc create mode 100755 samples-java/run_samples.cmd create mode 100755 samples-java/run_samples.sh create mode 100644 samples-java/src/main/java/DisplayMessages/Temperature.java create mode 100755 samples-scala/run_samples.cmd create mode 100755 samples-scala/run_samples.sh create mode 100644 samples-scala/src/main/scala/A_OutputMessagesToConsole/Demo.scala rename samples-scala/src/main/scala/{OutputMessagesToConsole/Temperature.scala => A_OutputMessagesToConsole/ISO8601DateTime.scala} (51%) create mode 100644 samples-scala/src/main/scala/A_OutputMessagesToConsole/Temperature.scala rename samples-scala/src/main/scala/{MessagesThroughput => B_MessagesThroughput}/Demo.scala (86%) rename samples-scala/src/main/scala/{MessagesThroughput => B_MessagesThroughput}/Monitoring.scala (96%) create mode 100644 samples-scala/src/main/scala/C_Checkpoints/Demo.scala delete mode 100644 samples-scala/src/main/scala/MessagesThroughput/ReactiveStreaming.scala delete mode 100644 samples-scala/src/main/scala/OutputMessagesToConsole/Demo.scala delete mode 100644 samples-scala/src/main/scala/OutputMessagesToConsole/ReactiveStreaming.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/Offset.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/ResumeOnError.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/Retry.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/StopOnError.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/AzureBlob.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/CheckpointBackend.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/SavePositionOnPull.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/CheckpointRecord.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/CheckpointsTableSchema.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Column.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Record.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Table.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/TableSchema.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/filters/Ignore.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/filters/Model.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHubPartition.scala create mode 100644 src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala create mode 100644 src/test/resources/devices.json create mode 100644 tools/devices-simulator/README.md create mode 100644 tools/devices-simulator/abstract_simulator.js create mode 100755 tools/devices-simulator/download_credentials.sh create mode 100644 tools/devices-simulator/humidity_simulator.js create mode 100644 tools/devices-simulator/package.json create mode 100644 tools/devices-simulator/send.js create mode 100644 tools/devices-simulator/temperature_simulator.js diff --git a/.gitignore b/.gitignore index 82a3a52..5ffc3d4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ ### Project specific ### .idea +tools/devices-simulator/credentials.js +*.crt ### MacOS ### .DS_Store @@ -27,6 +29,9 @@ project/plugins/project/ .ensime_cache/ .ensime +### Node.js ### +node_modules/ + ### Intellij ### # Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..c557c5a --- /dev/null +++ b/.travis.yml @@ -0,0 +1,16 @@ +jdk: oraclejdk8 +language: scala +scala: + - 2.11.8 + - 2.12.0-RC1 +cache: + directories: + - "$HOME/.ivy2" + - "$HOME/.sbt" + - "$HOME/.m2" +notifications: + slack: + secure: S6pcmclrj9vaqHOFMrjgYkF6wXrYF6nB5joYY0rqAwsmTLf7crXRVKZ8txlatpxMHc20Rbw8RQDM6tTka9wwBkHZZfErrcPsS84d5MU9siEkIY42/bAQwuYhxkcgilttgFmSwzLodE72giC/VMhIYCSOyOXIxuR0VtBiPD9Inm9QZ35dZDx3P3nbnaOC4fk+BjdbrX1LB8YL9z5Gy/9TqI90w0FV85XMef75EnSgpqeMD/GMB5hIg+arWVnC2S6hZ91PPCcxCTKBYDjwqUac8mFW/sMFT/yrb2c0NE6ZQqa3dlx/XFyC1X6+7DjJli2Y8OU+FPjY1tQC8JxgVFTbddIgCdUM/5be4uHN/KNs/yF7w1g06ZXK4jhJxxpL4zWINlqDrDmLaqhAtPQkc2CqL3g8MCwYxBbxZY4aFyPfZD7YLdQXDzJZNcfXn9RQQh5y+/zexbGc1zZ/XUo5bK3VbElSs+o2ErI+Sze0FaiK8fW+QeitBdGvjMY7YVKi0Zzf5Dxx1wwxiHR1PQ1r0hA8YZQxwwdpa5lWLFlSVu2w+upPtXqfINMeFktQPbOs1JWIvUvLV0A38dS6R/DsM/W1a3OEVbHQ0Z6OV1nffDnGYPLUl5kRDPFuYYugmCpQHW73lqJdiM0O+Ote4eOQniL1rcajtt+V5cn1/JRWzdJ4PH0= +before_install: +- openssl aes-256-cbc -K $encrypted_13c010a56f48_key -iv $encrypted_13c010a56f48_iv + -in devices.json.enc -out src/test/resources/devices.json -d diff --git a/CHECKPOINTING.md b/CHECKPOINTING.md new file mode 100644 index 0000000..20dcb75 --- /dev/null +++ b/CHECKPOINTING.md @@ -0,0 +1,88 @@ +# Stream position checkpoint + +The library provides a mechanism to restart the stream from a recent *checkpoint*, to be resilient +to restarts and crashes. + +*Checkpoints* are saved automatically, with a configured frequency, on a storage provided. + +For instance, the stream position can be saved every 15 seconds in Azure blobs, or +(soon) a custom backend. + +To store checkpoints in Azure blobs the configuration looks like this: + +``` +iothub-checkpointing { + enabled = true + frequency = 15s + countThreshold = 1000 + timeThreshold = 30s + storage { + rwTimeout = 5s + backendType = "AzureBlob" + namespace = "iothub-react-checkpoints" + azureblob { + lease = 15s + useEmulator = false + protocol = "https" + account = "..." + key = "..." + } + } +} +``` + +Soon it will be possible to plug in custom storage backends implementing a simple +[interface](src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/CheckpointBackend.scala) +to read and write the stream position. + +There is also one API parameter to enabled/disable the mechanism, for example: + +```scala +val start = java.time.Instant.now() +val withCheckpoints = false + +IoTHub.source(start, withCheckpoints) + .map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature])) + .filter(_.value > 100) + .to(console) + .run() +``` + +# Checkpointing behavior + +### Configuration + +The following table describes the impact of the settings within the `iothub-checkpointing` +configuration block. For further information, you can also check the +[reference.conf](src/main/resources/reference.conf) file. + +| Setting | Type | Example | Description | +|---------|------|---------|-------------| +| **enabled** | bool | true | Global switch to enable/disable the checkpointing feature. This value overrides the API parameter "withCheckpoints". | +| **frequency** | duration | 15s | How often to check if the offset in memory should be saved to storage. The check is scheduled for each partition individually. | +| **countThreshold** | int | 1000 | How many messages to stream before saving the position. The setting is applied to each partition individually. The value should be big enough to take into account buffering and batching. | +| **timeThreshold** | duration | 60s | In case of low traffic (i.e. when not reaching countThreshold), save the stream position that is older than this value.| +| storage.**rwTimeout** | duration | 5000ms | How long to wait, when writing to the storage, before triggering a storage timeout exception. | +| storage.**backendType** | string or class name | "AzureBlob" | Currently only "AzureBlob" is supported. The name of the backend, or the class FQDN, to use to write to the storage. This provides a way to inject custom storage logic. | +| storage.**namespace** | string | "mycptable" | The table/container which will contain the checkpoints data. When streaming data from multiple IoT hubs, you can use this setting, to store each stream position to a common storage, but in separate tables/containers. | + +### Runtime + +The following table describes the system behavior, based on **API parameters** and stored **state**. + +| Checkpointing | Start point | Saved position | Behavior | +|:---:|:---:|:-------:|---| +| No | No | No | The stream starts from the beginning +| No | No | **Yes** | The stream starts from the beginning (**the saved position is ignored**) +| No | Yes | No | The stream starts from the 'start point' provided +| No | Yes | **Yes** | The stream starts from the 'start point' provided (**the saved position is ignored**) +| Yes | No | No | The stream starts from the beginning +| Yes | No | **Yes** | **The stream starts from the saved position** +| Yes | Yes | No | The stream starts from the 'start point' provided +| Yes | Yes | **Yes** | **The stream starts from the saved position** + +Legend: +* **Checkpointing**: whether checkpointing (saving the stream position) is enabled or disabled +* **Start point**: whether the client provides a starting position (date or offset) or ask for all +the events from the beginning +* **Saved position**: whether there is a position saved in the storage diff --git a/Codestyle.IntelliJ.xml b/Codestyle.IntelliJ.xml index 157cd7c..0e5064c 100644 --- a/Codestyle.IntelliJ.xml +++ b/Codestyle.IntelliJ.xml @@ -1,22 +1,22 @@ - \ No newline at end of file +