Update sample scripts and configuration setting files

This commit is contained in:
SJ 2017-06-06 19:51:30 -07:00
Родитель 52ef5af91c
Коммит 00e8c211b3
12 изменённых файлов: 90 добавлений и 218 удалений

Просмотреть файл

@ -30,7 +30,7 @@ reactive-eventhubs {
storage {
rwTimeout = 5s
namespace = "iothub-react-checkpoints"
namespace = "eventhub-react-checkpoints"
backendType = "AzureBlob"
azureblob {
@ -59,7 +59,7 @@ reactive-eventhubs {
storage {
rwTimeout = 5s
namespace = "iothub_react_checkpoints"
namespace = "eventhub_react_checkpoints"
backendType = "Cassandra"
cassandra {
@ -73,10 +73,6 @@ reactive-eventhubs {
}
```
We plan to allow plugging in custom storage backends, by implementing a simple
[interface](src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/CheckpointBackend.scala)
to read and write the stream position. Let us know if you are interested!
The checkpointing feature is not enabled by default, so the library will not
save the stream offsets automatically. To use checkpointing, use the
`saveOffsets` option when creating the stream:
@ -86,7 +82,7 @@ val options = SourceOptions()
.fromTime(java.time.Instant.now())
.saveOffsets()
IoTHub().source(options)
EventHub().source(options)
.map(m ⇒ jsonParser.readValue(m.contentAsString, classOf[Temperature]))
.filter(_.value > 100)
.to(console)
@ -98,7 +94,7 @@ IoTHub().source(options)
### Configuration
The following table describes the impact of the settings within the
`iothub-react.checkpointing` configuration block. For further information, you
`eventhub-react.checkpointing` configuration block. For further information, you
can also check the [reference.conf](src/main/resources/reference.conf) file.
| Setting | Type | Example | Description |
@ -107,7 +103,7 @@ can also check the [reference.conf](src/main/resources/reference.conf) file.
| **countThreshold** | int | 1000 | How many messages to stream before saving the position. The setting is applied to each partition individually. The value should be big enough to take into account buffering and batching. |
| **timeThreshold** | duration | 60s | In case of low traffic (i.e. when not reaching countThreshold), save a stream position older than this value.|
| storage.**rwTimeout** | duration | 5000ms | How long to wait, when writing to the storage, before triggering a storage timeout exception. |
| storage.**namespace** | string | "mycptable" | The table/container which will contain the checkpoints data. When streaming data from multiple IoT Hubs, you can use this setting to use separate tables/containers. |
| storage.**namespace** | string | "mycptable" | The table/container which will contain the checkpoints data. When streaming data from multiple Event Hubs, you can use this setting to use separate tables/containers. |
| storage.**backendType** | string or class name | "AzureBlob" | Currently "AzureBlob" and "Cassandra" are supported. The name of the backend, or the class FQDN, to use to write to the storage. This provides a way to inject custom storage logic. |
### Runtime
@ -135,7 +131,7 @@ Legend:
### Edge cases
* Azure IoT Hub stores messages up to 7 days. It's possible that the position
* Azure Event Hub stores messages up to 7 days. It's possible that the position
stored doesn't exist anymore. In such case the stream will start from the
first message available.
* If the checkpoint position is ahead of the last available message, the stream

Просмотреть файл

@ -20,8 +20,6 @@ stream of incoming telemetry data is read, parsed and converted to a
```scala
EventHub().source()
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
```
@ -32,8 +30,6 @@ and the equivalent code in Java:
TypeReference<Temperature> type = new TypeReference<Temperature>() {};
new EventHub().source()
.map(m -> (Temperature) jsonParser.readValue(m.contentAsString(), type))
.filter(x -> x.value > 100)
.to(console())
.run(streamMaterializer);
```
@ -70,8 +66,6 @@ case class KafkaProducer(bootstrapServer: String)(implicit val system: ActorSyst
val kafkaProducer = KafkaProducer(bootstrapServer)
EventHub().source()
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.runWith(kafkaProducer.getSink())
```
@ -90,8 +84,6 @@ val p1 = 0
val p2 = 3
EventHub().source(Seq(p1, p2))
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
```
@ -106,8 +98,6 @@ too:
val start = java.time.Instant.now()
EventHub().source(start)
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
```
@ -126,8 +116,6 @@ val options = SourceOptions()
.saveOffsets()
EventHub().source(options)
.map(m ⇒ parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
```
@ -186,12 +174,11 @@ authentication values to use, can be found in the
Properties required to receive telemetry:
* **hubName**: see `Endpoints``Messaging``Events``Event Hub-compatible name`
* **hubEndpoint**: see `Endpoints``Messaging``Events``Event Hub-compatible endpoint`
* **hubPartitions**: see `Endpoints``Messaging``Events``Partitions`
* **accessPolicy**: usually `service`, see `Shared access policies`
* **accessKey**: see `Shared access policies``key name``Primary key` (it's
a base64 encoded string)
* **eventHubName**:
* **eventHubEndpoint**:
* **eventHubPartitions**:
* **accessPolicy**:
* **accessKey**:
The values should be stored in your `application.conf` resource (or equivalent).
Optionally you can reference environment settings if you prefer, for example to
@ -201,12 +188,11 @@ hide sensitive data.
reactive-eventhubs {
connection {
hubName = "<Event Hub compatible name>"
hubEndpoint = "<Event Hub compatible endpoint>"
hubPartitions = <the number of partitions in your Event Hub>
eventHubName = "<Event Hub name>"
eventHubEndpoint = "<Event Hub endpoint>"
eventHubPartitions = <the number of partitions in your Event Hub>
accessPolicy = "<access policy name>"
accessKey = "<access policy key>"
accessHostName = "<access host name>"
}
[... other settings...]
@ -219,12 +205,11 @@ Example using environment settings:
reactive-eventhubs {
connection {
hubName = ${?EVENTHUB_NAME}
hubEndpoint = ${?EVENTHUB_ENDPOINT}
hubPartitions = ${?EVENTHUB_PARTITIONS}
eventHubName = ${?EVENTHUB_NAME}
eventHubEndpoint = ${?EVENTHUB_ENDPOINT}
eventHubPartitions = ${?EVENTHUB_PARTITIONS}
accessPolicy = ${?EVENTHUB_ACCESS_POLICY}
accessKey = ${?EVENTHUB_ACCESS_KEY}
accessHostName = ${?EVENTHUB_ACCESS_HOSTNAME}
}
[... other settings...]
@ -272,7 +257,7 @@ cases and how the Reactive Event Hubs API works. All the demos require an
instance of Azure Event Hubs, with some telemetry to stream.
1. **DisplayMessages** [Java]: how to stream Azure Event Hubs telemetry within
a Java application, filtering temperature values greater than 60C
a Java application
1. **AllMessagesFromBeginning** [Scala]: simple example streaming all the events
in the hub.
1. **OnlyRecentMessages** [Scala]: stream all the events, starting from the
@ -282,8 +267,6 @@ instance of Azure Event Hubs, with some telemetry to stream.
1. **MultipleDestinations** [Scala]: shows how to read once and deliver events
to multiple destinations.
1. **CloseStream** [Scala]: show how to close the stream
1. **PrintTemperature** [Scala]: stream all Temperature events and print data to
the console.
1. **Throughput** [Scala]: stream all events and display statistics about the
throughput.
1. **Throttling** [Scala]: throttle the incoming stream to a defined speed of

Просмотреть файл

@ -4,8 +4,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react-demo</artifactId>
<groupId>com.microsoft.azure.eventhubs</groupId>
<artifactId>eventhub-react-demo</artifactId>
<version>0.9.0</version>
<repositories>
@ -18,8 +18,8 @@
<dependencies>
<!-- TODO: fix `sbt samplesJava/run` and remove this dependency, using the source code instead -->
<dependency>
<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react_2.12</artifactId>
<groupId>com.microsoft.azure.eventhubs</groupId>
<artifactId>eventhubs-react_2.12</artifactId>
<version>0.9.0-DEV.170313a</version>
</dependency>
<dependency>

Просмотреть файл

@ -3,40 +3,32 @@
// @see http://doc.akka.io/docs/akka/2.4.10/scala/logging.html
akka {
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "DEBUG"
loglevel = "INFO"
}
iothub-react {
eventhub-react {
// Connection settings can be retrieved from the Azure portal at https://portal.azure.com
// For more information about IoT Hub settings, see:
// https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-create-through-portal#endpoints
// https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-java-java-getstarted
connection {
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible name"
hubName = ${?IOTHUB_EVENTHUB_NAME}
// your-namespace.servicebus.windows.net
eventHubEndpoint = ${?EVENTHUB_ENDPOINT}
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible endpoint"
hubEndpoint = ${?IOTHUB_EVENTHUB_ENDPOINT}
// eh1
eventHubName = ${?EVENTHUB_NAME}
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
hubPartitions = ${?IOTHUB_EVENTHUB_PARTITIONS}
// 2..n
eventHubPartitions = ${?EVENTHUB_PARTITIONS}
// see: "IoT Hub" ⇒ your hub ⇒ "Shared access policies"
// e.g. you should use the predefined "service" policy
accessPolicy = ${?IOTHUB_ACCESS_POLICY}
// SAS policy name
accessPolicy = ${?EVENTHUB_ACCESS_POLICY}
// see: Shared access policies ⇒ key name ⇒ Primary key
accessKey = ${?IOTHUB_ACCESS_KEY}
// see: Shared access policies ⇒ key name ⇒ Connection string ⇒ "HostName"
accessHostName = ${?IOTHUB_ACCESS_HOSTNAME}
// policy key
accessKey = ${?EVENTHUB_ACCESS_KEY}
}
streaming {
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
@ -46,17 +38,17 @@ iothub-react {
// How many messages to retrieve on each pull, max is 999
receiverBatchSize = 999
// Whether to retrieve information about the partitions while streming events from IoT Hub
// Whether to retrieve information about the partitions while streaming events from Event Hub
retrieveRuntimeInfo = true
}
checkpointing {
// Checkpoints frequency (best effort), for each IoT hub partition
// Checkpoints frequency (best effort), for each Event hub partition
// Min: 1 second, Max: 1 minute
frequency = 5s
// How many messages to stream before saving the position, for each IoT hub partition.
// How many messages to stream before saving the position, for each Event hub partition.
// Since the stream position is saved in the Source, before the rest of the
// Graph (Flows/Sinks), this provides a mechanism to replay buffered messages.
// The value should be greater than receiverBatchSize
@ -77,7 +69,7 @@ iothub-react {
// If you use the same storage while processing multiple streams, you'll want
// to use a distinct table/container/path in each job, to to keep state isolated
namespace = "iothub-react-checkpoints"
namespace = "eventhub-react-checkpoints"
azureblob {
// Time allowed for a checkpoint to be written, rounded to seconds (min 15, max 60)
@ -86,8 +78,8 @@ iothub-react {
useEmulator = false
// Storage credentials
protocol = "https"
account = ${?IOTHUB_CHECKPOINT_ACCOUNT}
key = ${?IOTHUB_CHECKPOINT_KEY}
account = ${?CHECKPOINT_ACCOUNT}
key = ${?CHECKPOINT_KEY}
}
// You can easily test this with Docker --> docker run -ip 9042:9042 --rm cassandra

Просмотреть файл

@ -6,37 +6,29 @@ akka {
loglevel = "INFO"
}
iothub-react {
eventhub-react {
// Connection settings can be retrieved from the Azure portal at https://portal.azure.com
// For more information about IoT Hub settings, see:
// https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-create-through-portal#endpoints
// https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-java-java-getstarted
connection {
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible name"
hubName = ${?IOTHUB_EVENTHUB_NAME}
// your-namespace.servicebus.windows.net
eventHubEndpoint = ${?EVENTHUB_ENDPOINT}
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible endpoint"
hubEndpoint = ${?IOTHUB_EVENTHUB_ENDPOINT}
// eh1
eventHubName = ${?EVENTHUB_NAME}
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
hubPartitions = ${?IOTHUB_EVENTHUB_PARTITIONS}
// 0..n
eventHubPartitions = ${?EVENTHUB_PARTITIONS}
// see: "IoT Hub" ⇒ your hub ⇒ "Shared access policies"
// e.g. you should use the predefined "service" policy
accessPolicy = ${?IOTHUB_ACCESS_POLICY}
// SAS policy name
accessPolicy = ${?EVENTHUB_ACCESS_POLICY}
// see: Shared access policies ⇒ key name ⇒ Primary key
accessKey = ${?IOTHUB_ACCESS_KEY}
// see: Shared access policies ⇒ key name ⇒ Connection string ⇒ "HostName"
accessHostName = ${?IOTHUB_ACCESS_HOSTNAME}
// policy key
accessKey = ${?EVENTHUB_ACCESS_KEY}
}
streaming {
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
@ -46,17 +38,17 @@ iothub-react {
// How many messages to retrieve on each pull, max is 999
receiverBatchSize = 999
// Whether to retrieve information about the partitions while streaming events from IoT Hub
// Whether to retrieve information about the partitions while streaming events from Event Hub
retrieveRuntimeInfo = true
}
checkpointing {
// Checkpoints frequency (best effort), for each IoT hub partition
// Checkpoints frequency (best effort), for each Event hub partition
// Min: 1 second, Max: 1 minute
frequency = 5s
// How many messages to stream before saving the position, for each IoT hub partition.
// How many messages to stream before saving the position, for each Event hub partition.
// Since the stream position is saved in the Source, before the rest of the
// Graph (Flows/Sinks), this provides a mechanism to replay buffered messages.
// The value should be greater than receiverBatchSize
@ -77,7 +69,7 @@ iothub-react {
// If you use the same storage while processing multiple streams, you'll want
// to use a distinct table/container/path in each job, to to keep state isolated
namespace = "iothub-react-checkpoints"
namespace = "eventhub-react-checkpoints"
azureblob {
// Time allowed for a checkpoint to be written, rounded to seconds (min 15, max 60)
@ -86,8 +78,8 @@ iothub-react {
useEmulator = false
// Storage credentials
protocol = "https"
account = ${?IOTHUB_CHECKPOINT_ACCOUNT}
key = ${?IOTHUB_CHECKPOINT_KEY}
account = ${?CHECKPOINT_ACCOUNT}
key = ${?CHECKPOINT_KEY}
}
// You can easily test this with Docker --> docker run -ip 9042:9042 --rm cassandra

Просмотреть файл

@ -10,42 +10,31 @@
::
:: SET EVENTHUB_NAME="my-event-hub"
::
:: SET EVENTHUB_ENDPOINT="sb://iothub-ns-myioth-75186-9fb862f912.servicebus.windows.net/"
:: SET EVENTHUB_ENDPOINT="sb://your-namespace.servicebus.windows.net/"
::
:: SET EVENTHUB_PARTITIONS=4
::
:: SET EVENTHUB_ACCESS_POLICY="service"
::
:: SET EVENTHUB_ACCESS_KEY="1Ab23456C78d+E9fOgH1234ijklMNo5P//Q6rStuwX7="
::
:: SET EVENTHUB_ACCESS_HOSTNAME="my-event-hub.azure-devices.net"
:: SET EVENTHUB_ACCESS_KEY="..."
::
:: SET STREAMING_CHECKPOINT_ACCOUNT = 'myazurestorage'
::
:: SET STREAMING_CHECKPOINT_KEY = "A0BcDef1gHIJKlmn23o8PQrStUvWxyzAbc4dEFG5HOIJklMnopqR+StuVwxYzJjxsU6vnDeNTv7Ipqs8MaBcDE=="
:: SET STREAMING_CHECKPOINT_KEY = "..."
::
:: see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible name`
SET EVENTHUB_NAME = ""
:: see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible endpoint`
SET EVENTHUB_ENDPOINT = ""
:: see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
SET EVENTHUB_PARTITIONS = ""
:: see: Shared access policies, we suggest to use `service` here
SET EVENTHUB_ACCESS_POLICY = ""
:: see: Shared access policies ⇒ key name ⇒ Primary key
SET EVENTHUB_ACCESS_KEY = ""
:: see: Shared access policies ⇒ key name ⇒ Connection string ⇒ HostName
SET EVENTHUB_ACCESS_HOSTNAME = ""
:: When using checkpoints stored in Azure Blob, this is the Azure Storage Account name
SET STREAMING_CHECKPOINT_ACCOUNT = ""
:: When using checkpoints stored in Azure Blob, this is the Azure Storage Account secret key
SET STREAMING_CHECKPOINT_KEY = ""

Просмотреть файл

@ -10,39 +10,29 @@
#
# $env:EVENTHUB_NAME="my-event-hub"
#
# $env:EVENTHUB_ENDPOINT="sb://iothub-ns-myioth-75186-9fb862f912.servicebus.windows.net/"
# $env:EVENTHUB_ENDPOINT="sb://your-namespace.servicebus.windows.net/"
#
# $env:EVENTHUB_PARTITIONS=4
#
# $env:ACCESS_POLICY="service"
#
# $env:ACCESS_KEY="1Ab23456C78d+E9fOgH1234ijklMNo5P//Q6rStuwX7="
#
# $env:ACCESS_HOSTNAME="my-event-hub.azure-devices.net"
# $env:ACCESS_KEY="..."
#
# $env:STREAMING_CHECKPOINT_ACCOUNT = 'myazurestorage'
#
# $env:STREAMING_CHECKPOINT_KEY = "A0BcDef1gHIJKlmn23o8PQrStUvWxyzAbc4dEFG5HOIJklMnopqR+StuVwxYzJjxsU6vnDeNTv7Ipqs8MaBcDE=="
# $env:STREAMING_CHECKPOINT_KEY = "..."
#
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible name`
$env:EVENTHUB_NAME = ''
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible endpoint`
$env:EVENTHUB_ENDPOINT = ''
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
$env:EVENTHUB_PARTITIONS = ''
# see: Shared access policies, we suggest to use `service` here
$env:EVENTHUB_ACCESS_POLICY = ''
# see: Shared access policies ⇒ key name ⇒ Primary key
$env:EVENTHUB_ACCESS_KEY = ''
# see: Shared access policies ⇒ key name ⇒ Connection string ⇒ HostName
$env:EVENTHUB_ACCESS_HOSTNAME = ''
# When using checkpoints stored in Azure Blob, this is the Azure Storage Account name
$env:STREAMING_CHECKPOINT_ACCOUNT = ''

Просмотреть файл

@ -10,39 +10,29 @@
#
# export EVENTHUB_NAME="my-event-hub"
#
# export EVENTHUB_ENDPOINT="sb://iothub-ns-myioth-75186-9fb862f912.servicebus.windows.net/"
# export EVENTHUB_ENDPOINT="sb://your-namespace.servicebus.windows.net/"
#
# export EVENTHUB_PARTITIONS=4
#
# export EVENTHUB_ACCESS_POLICY="service"
#
# export EVENTHUB_ACCESS_KEY="1Ab23456C78d+E9fOgH1234ijklMNo5P//Q6rStuwX7="
#
# export EVENTHUB_ACCESS_HOSTNAME="my-event-hub.azure-devices.net"
# export EVENTHUB_ACCESS_KEY="..."
#
# export STREAMING_CHECKPOINT_ACCOUNT = 'myazurestorage'
#
# export STREAMING_CHECKPOINT_KEY = "A0BcDef1gHIJKlmn23o8PQrStUvWxyzAbc4dEFG5HOIJklMnopqR+StuVwxYzJjxsU6vnDeNTv7Ipqs8MaBcDE=="
# export STREAMING_CHECKPOINT_KEY = "..."
#
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible name`
export EVENTHUB_NAME=""
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible endpoint`
export EVENTHUB_ENDPOINT=""
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
export EVENTHUB_PARTITIONS=""
# see: Shared access policies, we suggest to use `service` here
export EVENTHUB_ACCESS_POLICY=""
# see: Shared access policies ⇒ key name ⇒ Primary key
export EVENTHUB_ACCESS_KEY=""
# see: Shared access policies ⇒ key name ⇒ Connection string ⇒ HostName
export EVENTHUB_ACCESS_HOSTNAME=""
# When using checkpoints stored in Azure Blob, this is the Azure Storage Account name
export STREAMING_CHECKPOINT_ACCOUNT=""

Просмотреть файл

@ -3,35 +3,21 @@
reactive-eventhubs {
// Connection settings can be retrieved from the Azure portal at https://portal.azure.com
// For more information about IoT Hub settings, see:
// https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-create-through-portal#endpoints
// https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-java-java-getstarted
connection {
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible name"
hubName = ${?EVENTHUB_NAME}
eventHubName = ${?EVENTHUB_NAME}
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible endpoint"
hubEndpoint = ${?EVENTHUB_ENDPOINT}
eventHubEndpoint = ${?EVENTHUB_ENDPOINT}
// see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
hubPartitions = ${?EVENTHUB_PARTITIONS}
eventHubPartitions = ${?EVENTHUB_PARTITIONS}
// see: "IoT Hub" ⇒ your hub ⇒ "Shared access policies"
// e.g. you should use the predefined "service" policy
accessPolicy = ${?EVENTHUB_ACCESS_POLICY}
// see: Shared access policies ⇒ key name ⇒ Primary key
accessKey = ${?EVENTHUB_ACCESS_KEY}
// see: Shared access policies ⇒ key name ⇒ Connection string ⇒ "HostName"
accessHostName = ${?EVENTHUB_ACCESS_HOSTNAME}
}
streaming {
// see: "IoT Hub" >> your hub > "Messaging" >> Consumer groups
// "$Default" is predefined and is the typical scenario
consumerGroup = "$Default"
// How many messages to retrieve on each pull, max is 999
@ -40,17 +26,17 @@ reactive-eventhubs {
// Value expressed as a duration, e.g. 3s, 3000ms, "3 seconds", etc.
receiverTimeout = 5s
// Whether to retrieve information about the partitions while streming events from IoT Hub
// Whether to retrieve information about the partitions while streming events from Event Hub
retrieveRuntimeInfo = false
}
checkpointing {
// Checkpoints frequency (best effort), for each IoT hub partition
// Checkpoints frequency (best effort), for each Event hub partition
// Min: 1 second, Max: 1 minute
frequency = 15s
// How many messages to stream before saving the position, for each IoT hub partition.
// How many messages to stream before saving the position, for each Event hub partition.
// Since the stream position is saved in the Source, before the rest of the
// Graph (Flows/Sinks), this provides a mechanism to replay buffered messages.
// The value should be greater than receiverBatchSize
@ -71,7 +57,7 @@ reactive-eventhubs {
// If you use the same storage while processing multiple streams, you'll want
// to use a distinct table/container/path in each job, to to keep state isolated
namespace = "iothub-react-checkpoints"
namespace = "eventhub-react-checkpoints"
// com.microsoft.azure.reactiveeventhubs.checkpointing.Backends.AzureBlob
azureblob {

Просмотреть файл

@ -38,9 +38,9 @@ class ConnectConfiguration(configData: Config) extends IConnectConfiguration {
private[this] val confConnPath = "eventhub-react.connection."
lazy val eventHubNamespace = getNamespaceFromEndpoint(configData.getString(confConnPath + "hubEndpoint"))
lazy val eventHubName = configData.getString(confConnPath + "hubName")
lazy val eventHubPartitions = configData.getInt(confConnPath + "hubPartitions")
lazy val eventHubNamespace = getNamespaceFromEndpoint(configData.getString(confConnPath + "eventHubEndpoint"))
lazy val eventHubName = configData.getString(confConnPath + "eventHubName")
lazy val eventHubPartitions = configData.getInt(confConnPath + "eventHubPartitions")
lazy val accessPolicy = configData.getString(confConnPath + "accessPolicy")
lazy val accessKey = configData.getString(confConnPath + "accessKey")

Просмотреть файл

@ -3,21 +3,21 @@ akka {
loglevel = "DEBUG"
}
iothub-react {
eventhub-react {
connection {
partitions = ${?IOTHUB_CI_PARTITIONS}
name = ${?IOTHUB_CI_NAME}
namespace = ${?IOTHUB_CI_NAMESPACE}
accessPolicy = ${?IOTHUB_CI_ACCESS_POLICY_0}
accessKey = ${?IOTHUB_CI_ACCESS_KEY_0}
devices = ${?IOTHUB_CI_DEVICES_JSON_FILE}
partitions = ${?CI_PARTITIONS}
name = ${?CI_NAME}
namespace = ${?CI_NAMESPACE}
accessPolicy = ${?CI_ACCESS_POLICY_0}
accessKey = ${?CI_ACCESS_KEY_0}
devices = ${?CI_DEVICES_JSON_FILE}
hubName = ${?IOTHUB_CI_EVENTHUB_NAME}
hubEndpoint = ${?IOTHUB_CI_EVENTHUB_ENDPOINT}
hubPartitions = ${?IOTHUB_CI_EVENTHUB_PARTITIONS}
accessHostName = ${?IOTHUB_CI_ACCESS_HOSTNAME}
hubName = ${?CI_EVENTHUB_NAME}
hubEndpoint = ${?CI_EVENTHUB_ENDPOINT}
hubPartitions = ${?CI_EVENTHUB_PARTITIONS}
accessHostName = ${?CI_ACCESS_HOSTNAME}
}
streaming {
@ -33,14 +33,14 @@ iothub-react {
storage {
rwTimeout = 6s
backendType = "AzureBlob"
namespace = "iothub-react-checkpoints"
namespace = "eventhub-react-checkpoints"
azureblob {
lease = 15s
useEmulator = false
protocol = "https"
account = ${?IOTHUB_CHECKPOINT_ACCOUNT}
key = ${?IOTHUB_CHECKPOINT_KEY}
account = ${?CHECKPOINT_ACCOUNT}
key = ${?CHECKPOINT_KEY}
}
cassandra {

Просмотреть файл

@ -1,46 +0,0 @@
[
{
"deviceId": "device10000",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10001",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10002",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10003",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10004",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10005",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10006",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10007",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10008",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10009",
"primaryKey": "...your device key..."
},
{
"deviceId": "device10010",
"primaryKey": "...your device key..."
}
]