Merge changes from master branch
This commit is contained in:
Коммит
d1695a73d8
|
@ -12,4 +12,4 @@ notifications:
|
|||
slack:
|
||||
secure: S6pcmclrj9vaqHOFMrjgYkF6wXrYF6nB5joYY0rqAwsmTLf7crXRVKZ8txlatpxMHc20Rbw8RQDM6tTka9wwBkHZZfErrcPsS84d5MU9siEkIY42/bAQwuYhxkcgilttgFmSwzLodE72giC/VMhIYCSOyOXIxuR0VtBiPD9Inm9QZ35dZDx3P3nbnaOC4fk+BjdbrX1LB8YL9z5Gy/9TqI90w0FV85XMef75EnSgpqeMD/GMB5hIg+arWVnC2S6hZ91PPCcxCTKBYDjwqUac8mFW/sMFT/yrb2c0NE6ZQqa3dlx/XFyC1X6+7DjJli2Y8OU+FPjY1tQC8JxgVFTbddIgCdUM/5be4uHN/KNs/yF7w1g06ZXK4jhJxxpL4zWINlqDrDmLaqhAtPQkc2CqL3g8MCwYxBbxZY4aFyPfZD7YLdQXDzJZNcfXn9RQQh5y+/zexbGc1zZ/XUo5bK3VbElSs+o2ErI+Sze0FaiK8fW+QeitBdGvjMY7YVKi0Zzf5Dxx1wwxiHR1PQ1r0hA8YZQxwwdpa5lWLFlSVu2w+upPtXqfINMeFktQPbOs1JWIvUvLV0A38dS6R/DsM/W1a3OEVbHQ0Z6OV1nffDnGYPLUl5kRDPFuYYugmCpQHW73lqJdiM0O+Ote4eOQniL1rcajtt+V5cn1/JRWzdJ4PH0=
|
||||
before_install:
|
||||
- openssl aes-256-cbc -K $encrypted_cbef0ff679f7_key -iv $encrypted_cbef0ff679f7_iv -in devices.json.enc -out src/test/resources/devices.json -d
|
||||
- if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then openssl aes-256-cbc -K $encrypted_cbef0ff679f7_key -iv $encrypted_cbef0ff679f7_iv -in devices.json.enc -out src/test/resources/devices.json -d ; fi
|
||||
|
|
|
@ -320,13 +320,13 @@ React API works. All the demos require an instance of Azure IoT hub, with some d
|
|||
connected devices.
|
||||
|
||||
We provide a [device simulator](tools/devices-simulator/README.md) in the tools section,
|
||||
which will help simulating some devices sending sample telemetry messages.
|
||||
which will help simulating some devices sending sample telemetry events.
|
||||
|
||||
When ready, you should either edit the `application.conf` configuration files
|
||||
([scala](samples-scala/src/main/resources/application.conf) and
|
||||
[java](samples-java/src/main/resources/application.conf))
|
||||
with your credentials, or set the corresponding environment variables.
|
||||
Follow the instructions in the previous section on how to set the correct values.
|
||||
Follow the instructions described in the previous section on how to set the correct values.
|
||||
|
||||
The root folder includes also a script showing how to set the environment variables in
|
||||
[Linux/MacOS](setup-env-vars.sh) and [Windows](setup-env-vars.bat).
|
||||
|
|
Двоичные данные
devices.json.enc
Двоичные данные
devices.json.enc
Двоичный файл не отображается.
|
@ -1,7 +1,5 @@
|
|||
@ECHO OFF
|
||||
|
||||
REM sbt samplesJava/run
|
||||
|
||||
ECHO Select demo to run:
|
||||
ECHO.
|
||||
ECHO [1] DisplayMessages.Main
|
||||
|
@ -14,5 +12,3 @@ cd samples-java
|
|||
|
||||
if "%opt%"=="1" mvn clean compile exec:java -Dexec.mainClass="DisplayMessages.Main"
|
||||
if "%opt%"=="2" mvn clean compile exec:java -Dexec.mainClass="SendMessageToDevice.Main"
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
#sbt samplesJava/run
|
||||
|
||||
echo "Select demo to run:"
|
||||
echo
|
||||
echo " [1] DisplayMessages.Main"
|
||||
|
@ -16,5 +14,3 @@ fi
|
|||
if [ "$opt" = "2" ]; then
|
||||
mvn clean compile exec:java -Dexec.mainClass="SendMessageToDevice.Main"
|
||||
fi
|
||||
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
</repositories>
|
||||
|
||||
<dependencies>
|
||||
<!-- TODO: fix `sbt samplesJava/run` and remove this dependency, using the source code instead -->
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure.iot</groupId>
|
||||
<artifactId>iothub-react_2.12</artifactId>
|
||||
|
|
|
@ -1 +1 @@
|
|||
sbt.version=0.13.13
|
||||
sbt.version = 0.13.13
|
||||
|
|
|
@ -17,21 +17,25 @@
|
|||
::
|
||||
:: SET IOTHUB_IOTHUB_ACCESS_POLICY = "service"
|
||||
::
|
||||
:: SET IOTHUB_ACCESS_KEY = "6XdRSFB9H61f+N3uOdBJiKwzeqbZUj1K//T2jFyewN4="
|
||||
:: SET IOTHUB_ACCESS_KEY = "1Ab23456C78d+E9fOgH1234ijklMNo5P//Q6rStuwX7="
|
||||
::
|
||||
:: SET IOTHUB_ACCESS_HOSTNAME = "my-iothub-one.azure-devices.net"
|
||||
::
|
||||
:: SET IOTHUB_CHECKPOINT_ACCOUNT = "myazurestorage"
|
||||
::
|
||||
:: SET IOTHUB_CHECKPOINT_KEY = "A0BcDef1gHIJKlmn23o8PQrStUvWxyzAbc4dEFG5HOIJklMnopqR+StuVwxYzJjxsU6vnDeNTv7Ipqs8MaBcDE=="
|
||||
::
|
||||
|
||||
:: see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible name"
|
||||
:: see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible name`
|
||||
SET IOTHUB_EVENTHUB_NAME = ""
|
||||
|
||||
:: see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible endpoint"
|
||||
:: see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible endpoint`
|
||||
SET IOTHUB_EVENTHUB_ENDPOINT = ""
|
||||
|
||||
:: see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
|
||||
SET IOTHUB_EVENTHUB_PARTITIONS = ""
|
||||
|
||||
:: see: Shared access policies, we suggest to use "service" here
|
||||
:: see: Shared access policies, we suggest to use `service` here
|
||||
SET IOTHUB_IOTHUB_ACCESS_POLICY = ""
|
||||
|
||||
:: see: Shared access policies ⇒ key name ⇒ Primary key
|
||||
|
@ -39,3 +43,10 @@ SET IOTHUB_ACCESS_KEY = ""
|
|||
|
||||
:: see: Shared access policies ⇒ key name ⇒ Connection string ⇒ HostName
|
||||
SET IOTHUB_ACCESS_HOSTNAME = ""
|
||||
|
||||
:: When using checkpoints stored in Azure Blob, this is the Azure Storage Account name
|
||||
SET IOTHUB_CHECKPOINT_ACCOUNT = ""
|
||||
|
||||
:: When using checkpoints stored in Azure Blob, this is the Azure Storage Account secret key
|
||||
SET IOTHUB_CHECKPOINT_KEY = ""
|
||||
|
||||
|
|
|
@ -17,21 +17,25 @@
|
|||
#
|
||||
# $env:IOTHUB_IOTHUB_ACCESS_POLICY = 'service'
|
||||
#
|
||||
# $env:IOTHUB_ACCESS_KEY = '6XdRSFB9H61f+N3uOdBJiKwzeqbZUj1K//T2jFyewN4='
|
||||
# $env:IOTHUB_ACCESS_KEY = '1Ab23456C78d+E9fOgH1234ijklMNo5P//Q6rStuwX7='
|
||||
#
|
||||
# SET IOTHUB_ACCESS_HOSTNAME = "my-iothub-one.azure-devices.net"
|
||||
# $env:IOTHUB_ACCESS_HOSTNAME = 'my-iothub-one.azure-devices.net'
|
||||
#
|
||||
# $env:IOTHUB_CHECKPOINT_ACCOUNT = 'myazurestorage'
|
||||
#
|
||||
# $env:IOTHUB_CHECKPOINT_KEY = 'A0BcDef1gHIJKlmn23o8PQrStUvWxyzAbc4dEFG5HOIJklMnopqR+StuVwxYzJjxsU6vnDeNTv7Ipqs8MaBcDE=='
|
||||
#
|
||||
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible name"
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible name`
|
||||
$env:IOTHUB_EVENTHUB_NAME = ''
|
||||
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible endpoint"
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible endpoint`
|
||||
$env:IOTHUB_EVENTHUB_ENDPOINT = ''
|
||||
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
|
||||
$env:IOTHUB_EVENTHUB_PARTITIONS = ''
|
||||
|
||||
# see: Shared access policies, we suggest to use "service" here
|
||||
# see: Shared access policies, we suggest to use `service` here
|
||||
$env:IOTHUB_IOTHUB_ACCESS_POLICY = ''
|
||||
|
||||
# see: Shared access policies ⇒ key name ⇒ Primary key
|
||||
|
@ -39,3 +43,9 @@ $env:IOTHUB_ACCESS_KEY = ''
|
|||
|
||||
# see: Shared access policies ⇒ key name ⇒ Connection string ⇒ HostName
|
||||
$env:IOTHUB_ACCESS_HOSTNAME = ''
|
||||
|
||||
# When using checkpoints stored in Azure Blob, this is the Azure Storage Account name
|
||||
$env:IOTHUB_CHECKPOINT_ACCOUNT = ''
|
||||
|
||||
# When using checkpoints stored in Azure Blob, this is the Azure Storage Account secret key
|
||||
$env:IOTHUB_CHECKPOINT_KEY = ''
|
||||
|
|
|
@ -17,21 +17,25 @@
|
|||
#
|
||||
# export IOTHUB_IOTHUB_ACCESS_POLICY="service"
|
||||
#
|
||||
# export IOTHUB_ACCESS_KEY="6XdRSFB9H61f+N3uOdBJiKwzeqbZUj1K//T2jFyewN4="
|
||||
# export IOTHUB_ACCESS_KEY="1Ab23456C78d+E9fOgH1234ijklMNo5P//Q6rStuwX7="
|
||||
#
|
||||
# export IOTHUB_ACCESS_HOSTNAME="my-iothub-one.azure-devices.net"
|
||||
#
|
||||
# export IOTHUB_CHECKPOINT_ACCOUNT = 'myazurestorage'
|
||||
#
|
||||
# export IOTHUB_CHECKPOINT_KEY = "A0BcDef1gHIJKlmn23o8PQrStUvWxyzAbc4dEFG5HOIJklMnopqR+StuVwxYzJjxsU6vnDeNTv7Ipqs8MaBcDE=="
|
||||
#
|
||||
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible name"
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible name`
|
||||
export IOTHUB_EVENTHUB_NAME=""
|
||||
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ "Event Hub-compatible endpoint"
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ `Event Hub-compatible endpoint`
|
||||
export IOTHUB_EVENTHUB_ENDPOINT=""
|
||||
|
||||
# see: Endpoints ⇒ Messaging ⇒ Events ⇒ Partitions
|
||||
export IOTHUB_EVENTHUB_PARTITIONS=""
|
||||
|
||||
# see: Shared access policies, we suggest to use "service" here
|
||||
# see: Shared access policies, we suggest to use `service` here
|
||||
export IOTHUB_IOTHUB_ACCESS_POLICY=""
|
||||
|
||||
# see: Shared access policies ⇒ key name ⇒ Primary key
|
||||
|
@ -39,3 +43,9 @@ export IOTHUB_ACCESS_KEY=""
|
|||
|
||||
# see: Shared access policies ⇒ key name ⇒ Connection string ⇒ HostName
|
||||
export IOTHUB_ACCESS_HOSTNAME=""
|
||||
|
||||
# When using checkpoints stored in Azure Blob, this is the Azure Storage Account name
|
||||
export IOTHUB_CHECKPOINT_ACCOUNT=""
|
||||
|
||||
# When using checkpoints stored in Azure Blob, this is the Azure Storage Account secret key
|
||||
export IOTHUB_CHECKPOINT_KEY=""
|
||||
|
|
|
@ -19,81 +19,86 @@ import scala.language.postfixOps
|
|||
|
||||
class AllIoTDeviceMessagesAreDelivered extends FeatureSpec with GivenWhenThen {
|
||||
|
||||
info("As a client of Azure IoT hub")
|
||||
info("I want to be able to receive all device messages")
|
||||
info("So I can process them all")
|
||||
// TODO: we should use tags
|
||||
if (!sys.env.contains("TRAVIS_PULL_REQUEST") || sys.env("TRAVIS_PULL_REQUEST") == "false") {
|
||||
|
||||
// A label shared by all the messages, to filter out data sent by other tests
|
||||
val testRunId: String = s"[${this.getClass.getName}-" + java.util.UUID.randomUUID().toString + "]"
|
||||
info("As a client of Azure IoT hub")
|
||||
info("I want to be able to receive all device messages")
|
||||
info("So I can process them all")
|
||||
|
||||
val counter = actorSystem.actorOf(Props[Counter], this.getClass.getName + "Counter")
|
||||
counter ! "reset"
|
||||
// A label shared by all the messages, to filter out data sent by other tests
|
||||
val testRunId: String = s"[${this.getClass.getName}-" + java.util.UUID.randomUUID().toString + "]"
|
||||
|
||||
def readCounter: Long = {
|
||||
Await.result(counter.ask("get")(5 seconds), 5 seconds).asInstanceOf[Long]
|
||||
}
|
||||
val counter = actorSystem.actorOf(Props[Counter], this.getClass.getName + "Counter")
|
||||
counter ! "reset"
|
||||
|
||||
feature("All IoT device messages are delivered") {
|
||||
def readCounter: Long = {
|
||||
Await.result(counter.ask("get")(5 seconds), 5 seconds).asInstanceOf[Long]
|
||||
}
|
||||
|
||||
scenario("Application wants to retrieve all IoT messages") {
|
||||
feature("All IoT device messages are delivered") {
|
||||
|
||||
// How many seconds we allow the test to wait for messages from the stream
|
||||
val TestTimeout = 60 seconds
|
||||
val DevicesCount = 5
|
||||
val MessagesPerDevice = 3
|
||||
val expectedMessageCount = DevicesCount * MessagesPerDevice
|
||||
scenario("Application wants to retrieve all IoT messages") {
|
||||
|
||||
// Create devices
|
||||
val devices = new collection.mutable.ListMap[Int, Device]()
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber) = new Device("device" + (10000 + deviceNumber))
|
||||
// How many seconds we allow the test to wait for messages from the stream
|
||||
val TestTimeout = 60 seconds
|
||||
val DevicesCount = 5
|
||||
val MessagesPerDevice = 3
|
||||
val expectedMessageCount = DevicesCount * MessagesPerDevice
|
||||
|
||||
// We'll use this as the streaming start date
|
||||
val startTime = Instant.now().minusSeconds(30)
|
||||
log.info(s"Test run: ${testRunId}, Start time: ${startTime}")
|
||||
// Create devices
|
||||
val devices = new collection.mutable.ListMap[Int, Device]()
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber) = new Device("device" + (10000 + deviceNumber))
|
||||
|
||||
Given("An IoT hub is configured")
|
||||
val hub = IoTHub()
|
||||
val messages = hub.source(startTime, false)
|
||||
// We'll use this as the streaming start date
|
||||
val startTime = Instant.now().minusSeconds(30)
|
||||
log.info(s"Test run: ${testRunId}, Start time: ${startTime}")
|
||||
|
||||
And(s"${DevicesCount} devices have sent ${MessagesPerDevice} messages each")
|
||||
for (msgNumber ← 1 to MessagesPerDevice) {
|
||||
for (deviceNumber ← 0 until DevicesCount) {
|
||||
devices(deviceNumber).sendMessage(testRunId, msgNumber)
|
||||
// Workaround for issue 995
|
||||
if (msgNumber == 1) devices(deviceNumber).waitConfirmation()
|
||||
Given("An IoT hub is configured")
|
||||
val hub = IoTHub()
|
||||
val messages = hub.source(startTime, false)
|
||||
|
||||
And(s"${DevicesCount} devices have sent ${MessagesPerDevice} messages each")
|
||||
for (msgNumber ← 1 to MessagesPerDevice) {
|
||||
for (deviceNumber ← 0 until DevicesCount) {
|
||||
devices(deviceNumber).sendMessage(testRunId, msgNumber)
|
||||
// Workaround for issue 995
|
||||
if (msgNumber == 1) devices(deviceNumber).waitConfirmation()
|
||||
}
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber).waitConfirmation()
|
||||
}
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber).waitConfirmation()
|
||||
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber).disconnect()
|
||||
|
||||
log.info(s"Messages sent: $expectedMessageCount")
|
||||
|
||||
When("A client application processes messages from the stream")
|
||||
counter ! "reset"
|
||||
val count = Sink.foreach[MessageFromDevice] {
|
||||
m ⇒ counter ! "inc"
|
||||
}
|
||||
|
||||
messages
|
||||
.filter(m ⇒ m.contentAsString contains testRunId)
|
||||
.runWith(count)
|
||||
|
||||
Then("Then the client application receives all the messages sent")
|
||||
var time = TestTimeout.toMillis.toInt
|
||||
val pause = time / 10
|
||||
var actualMessageCount = readCounter
|
||||
while (time > 0 && actualMessageCount < expectedMessageCount) {
|
||||
Thread.sleep(pause)
|
||||
time -= pause
|
||||
actualMessageCount = readCounter
|
||||
log.info(s"Messages received so far: ${actualMessageCount} of ${expectedMessageCount} [Time left ${time / 1000} secs]")
|
||||
}
|
||||
|
||||
hub.close()
|
||||
|
||||
assert(actualMessageCount == expectedMessageCount,
|
||||
s"Expecting ${expectedMessageCount} messages but received ${actualMessageCount}")
|
||||
}
|
||||
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber).disconnect()
|
||||
|
||||
log.info(s"Messages sent: $expectedMessageCount")
|
||||
|
||||
When("A client application processes messages from the stream")
|
||||
counter ! "reset"
|
||||
val count = Sink.foreach[MessageFromDevice] {
|
||||
m ⇒ counter ! "inc"
|
||||
}
|
||||
|
||||
messages
|
||||
.filter(m ⇒ m.contentAsString contains testRunId)
|
||||
.runWith(count)
|
||||
|
||||
Then("Then the client application receives all the messages sent")
|
||||
var time = TestTimeout.toMillis.toInt
|
||||
val pause = time / 10
|
||||
var actualMessageCount = readCounter
|
||||
while (time > 0 && actualMessageCount < expectedMessageCount) {
|
||||
Thread.sleep(pause)
|
||||
time -= pause
|
||||
actualMessageCount = readCounter
|
||||
log.info(s"Messages received so far: ${actualMessageCount} of ${expectedMessageCount} [Time left ${time / 1000} secs]")
|
||||
}
|
||||
|
||||
hub.close()
|
||||
|
||||
assert(actualMessageCount == expectedMessageCount,
|
||||
s"Expecting ${expectedMessageCount} messages but received ${actualMessageCount}")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,102 +20,106 @@ import scala.language.postfixOps
|
|||
|
||||
class DeviceIoTMessagesAreDeliveredInOrder extends FeatureSpec with GivenWhenThen {
|
||||
|
||||
info("As a client of Azure IoT hub")
|
||||
info("I want to receive the messages in order")
|
||||
info("So I can process them in order")
|
||||
// TODO: we should use tags
|
||||
if (!sys.env.contains("TRAVIS_PULL_REQUEST") || sys.env("TRAVIS_PULL_REQUEST") == "false") {
|
||||
|
||||
// A label shared by all the messages, to filter out data sent by other tests
|
||||
val testRunId: String = s"[${this.getClass.getName}-" + java.util.UUID.randomUUID().toString + "]"
|
||||
info("As a client of Azure IoT hub")
|
||||
info("I want to receive the messages in order")
|
||||
info("So I can process them in order")
|
||||
|
||||
val counter = actorSystem.actorOf(Props[Counter], this.getClass.getName + "Counter")
|
||||
counter ! "reset"
|
||||
// A label shared by all the messages, to filter out data sent by other tests
|
||||
val testRunId: String = s"[${this.getClass.getName}-" + java.util.UUID.randomUUID().toString + "]"
|
||||
|
||||
def readCounter: Long = {
|
||||
Await.result(counter.ask("get")(5 seconds), 5 seconds).asInstanceOf[Long]
|
||||
}
|
||||
val counter = actorSystem.actorOf(Props[Counter], this.getClass.getName + "Counter")
|
||||
counter ! "reset"
|
||||
|
||||
feature("Device IoT messages are delivered in order") {
|
||||
def readCounter: Long = {
|
||||
Await.result(counter.ask("get")(5 seconds), 5 seconds).asInstanceOf[Long]
|
||||
}
|
||||
|
||||
// Note: messages are sent in parallel to obtain some level of mix in the
|
||||
// storage, so do not refactor, i.e. don't do one device at a time.
|
||||
scenario("Customer needs to process IoT messages in the right order") {
|
||||
feature("Device IoT messages are delivered in order") {
|
||||
|
||||
// How many seconds we allow the test to wait for messages from the stream
|
||||
val TestTimeout = 120 seconds
|
||||
val DevicesCount = 25
|
||||
val MessagesPerDevice = 100
|
||||
val expectedMessageCount = DevicesCount * MessagesPerDevice
|
||||
// Note: messages are sent in parallel to obtain some level of mix in the
|
||||
// storage, so do not refactor, i.e. don't do one device at a time.
|
||||
scenario("Customer needs to process IoT messages in the right order") {
|
||||
|
||||
// Initialize device objects
|
||||
val devices = new collection.mutable.ListMap[Int, Device]()
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber) = new Device("device" + (10000 + deviceNumber))
|
||||
// How many seconds we allow the test to wait for messages from the stream
|
||||
val TestTimeout = 120 seconds
|
||||
val DevicesCount = 25
|
||||
val MessagesPerDevice = 100
|
||||
val expectedMessageCount = DevicesCount * MessagesPerDevice
|
||||
|
||||
// We'll use this as the streaming start date
|
||||
val startTime = Instant.now().minusSeconds(30)
|
||||
log.info(s"Test run: ${testRunId}, Start time: ${startTime}")
|
||||
// Initialize device objects
|
||||
val devices = new collection.mutable.ListMap[Int, Device]()
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber) = new Device("device" + (10000 + deviceNumber))
|
||||
|
||||
Given("An IoT hub is configured")
|
||||
val hub = IoTHub()
|
||||
val messages = hub.source(startTime, false)
|
||||
// We'll use this as the streaming start date
|
||||
val startTime = Instant.now().minusSeconds(30)
|
||||
log.info(s"Test run: ${testRunId}, Start time: ${startTime}")
|
||||
|
||||
And(s"${DevicesCount} devices have sent ${MessagesPerDevice} messages each")
|
||||
for (msgNumber ← 1 to MessagesPerDevice) {
|
||||
for (deviceNumber ← 0 until DevicesCount) {
|
||||
devices(deviceNumber).sendMessage(testRunId, msgNumber)
|
||||
// temporary workaround for issue 995
|
||||
if (msgNumber == 1) devices(deviceNumber).waitConfirmation()
|
||||
Given("An IoT hub is configured")
|
||||
val hub = IoTHub()
|
||||
val messages = hub.source(startTime, false)
|
||||
|
||||
And(s"${DevicesCount} devices have sent ${MessagesPerDevice} messages each")
|
||||
for (msgNumber ← 1 to MessagesPerDevice) {
|
||||
for (deviceNumber ← 0 until DevicesCount) {
|
||||
devices(deviceNumber).sendMessage(testRunId, msgNumber)
|
||||
// temporary workaround for issue 995
|
||||
if (msgNumber == 1) devices(deviceNumber).waitConfirmation()
|
||||
}
|
||||
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber).waitConfirmation()
|
||||
}
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber).disconnect()
|
||||
log.info(s"Messages sent: $expectedMessageCount")
|
||||
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber).waitConfirmation()
|
||||
}
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber).disconnect()
|
||||
log.info(s"Messages sent: $expectedMessageCount")
|
||||
When("A client application processes messages from the stream")
|
||||
|
||||
When("A client application processes messages from the stream")
|
||||
Then("Then the client receives all the messages ordered within each device")
|
||||
counter ! "reset"
|
||||
val cursors = new mutable.ParHashMap[String, Long]
|
||||
val verifier = Sink.foreach[MessageFromDevice] {
|
||||
m ⇒ {
|
||||
counter ! "inc"
|
||||
log.debug(s"device: ${m.deviceId}, seq: ${m.sequenceNumber} ")
|
||||
|
||||
Then("Then the client receives all the messages ordered within each device")
|
||||
counter ! "reset"
|
||||
val cursors = new mutable.ParHashMap[String, Long]
|
||||
val verifier = Sink.foreach[MessageFromDevice] {
|
||||
m ⇒ {
|
||||
counter ! "inc"
|
||||
log.debug(s"device: ${m.deviceId}, seq: ${m.sequenceNumber} ")
|
||||
|
||||
if (!cursors.contains(m.deviceId)) {
|
||||
if (!cursors.contains(m.deviceId)) {
|
||||
cursors.put(m.deviceId, m.sequenceNumber)
|
||||
}
|
||||
if (cursors(m.deviceId) > m.sequenceNumber) {
|
||||
fail(s"Message out of order. " +
|
||||
s"Device ${m.deviceId}, message ${m.sequenceNumber} arrived " +
|
||||
s"after message ${cursors(m.deviceId)}")
|
||||
}
|
||||
cursors.put(m.deviceId, m.sequenceNumber)
|
||||
}
|
||||
if (cursors(m.deviceId) > m.sequenceNumber) {
|
||||
fail(s"Message out of order. " +
|
||||
s"Device ${m.deviceId}, message ${m.sequenceNumber} arrived " +
|
||||
s"after message ${cursors(m.deviceId)}")
|
||||
}
|
||||
cursors.put(m.deviceId, m.sequenceNumber)
|
||||
}
|
||||
|
||||
messages
|
||||
.filter(m ⇒ m.contentAsString contains (testRunId))
|
||||
.runWith(verifier)
|
||||
|
||||
// Wait till all messages have been verified
|
||||
var time = TestTimeout.toMillis.toInt
|
||||
val pause = time / 12
|
||||
var actualMessageCount = readCounter
|
||||
while (time > 0 && actualMessageCount < expectedMessageCount) {
|
||||
Thread.sleep(pause)
|
||||
time -= pause
|
||||
actualMessageCount = readCounter
|
||||
log.info(s"Messages received so far: ${actualMessageCount} of ${expectedMessageCount} [Time left ${time / 1000} secs]")
|
||||
}
|
||||
|
||||
log.info("Stopping stream")
|
||||
hub.close()
|
||||
|
||||
log.info(s"actual messages ${actualMessageCount}")
|
||||
|
||||
assert(
|
||||
actualMessageCount == expectedMessageCount,
|
||||
s"Expecting ${expectedMessageCount} messages but received ${actualMessageCount}")
|
||||
}
|
||||
|
||||
messages
|
||||
.filter(m ⇒ m.contentAsString contains (testRunId))
|
||||
.runWith(verifier)
|
||||
|
||||
// Wait till all messages have been verified
|
||||
var time = TestTimeout.toMillis.toInt
|
||||
val pause = time / 12
|
||||
var actualMessageCount = readCounter
|
||||
while (time > 0 && actualMessageCount < expectedMessageCount) {
|
||||
Thread.sleep(pause)
|
||||
time -= pause
|
||||
actualMessageCount = readCounter
|
||||
log.info(s"Messages received so far: ${actualMessageCount} of ${expectedMessageCount} [Time left ${time / 1000} secs]")
|
||||
}
|
||||
|
||||
log.info("Stopping stream")
|
||||
hub.close()
|
||||
|
||||
log.info(s"actual messages ${actualMessageCount}")
|
||||
|
||||
assert(
|
||||
actualMessageCount == expectedMessageCount,
|
||||
s"Expecting ${expectedMessageCount} messages but received ${actualMessageCount}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,86 +15,90 @@ import scala.language.{implicitConversions, postfixOps}
|
|||
|
||||
class TestConnectivity extends FeatureSpec with GivenWhenThen {
|
||||
|
||||
info("As a test runner")
|
||||
info("I want to connect to EventuHub")
|
||||
info("So I can run the test suite")
|
||||
// TODO: we should use tags
|
||||
if (!sys.env.contains("TRAVIS_PULL_REQUEST") || sys.env("TRAVIS_PULL_REQUEST") == "false") {
|
||||
|
||||
// A label shared by all the messages, to filter out data sent by other tests
|
||||
val testRunId = s"[${this.getClass.getName}-" + java.util.UUID.randomUUID().toString + "]"
|
||||
val startTime = Instant.now().minusSeconds(60)
|
||||
info("As a test runner")
|
||||
info("I want to connect to EventuHub")
|
||||
info("So I can run the test suite")
|
||||
|
||||
feature("The test suite can connect to IoT Hub") {
|
||||
// A label shared by all the messages, to filter out data sent by other tests
|
||||
val testRunId = s"[${this.getClass.getName}-" + java.util.UUID.randomUUID().toString + "]"
|
||||
val startTime = Instant.now().minusSeconds(60)
|
||||
|
||||
scenario("The test uses the configured credentials") {
|
||||
feature("The test suite can connect to IoT Hub") {
|
||||
|
||||
// Enough devices to hit the first partitions, so that the test ends quickly
|
||||
val DevicesCount = 10
|
||||
scenario("The test uses the configured credentials") {
|
||||
|
||||
// Create devices
|
||||
val devices = new collection.mutable.ListMap[Int, Device]()
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber) = new Device("device" + (10000 + deviceNumber))
|
||||
// Enough devices to hit the first partitions, so that the test ends quickly
|
||||
val DevicesCount = 10
|
||||
|
||||
// Send a message from each device
|
||||
for (deviceNumber ← 0 until DevicesCount) {
|
||||
devices(deviceNumber).sendMessage(testRunId, 0)
|
||||
// Workaround for issue 995
|
||||
devices(deviceNumber).waitConfirmation()
|
||||
}
|
||||
// Create devices
|
||||
val devices = new collection.mutable.ListMap[Int, Device]()
|
||||
for (deviceNumber ← 0 until DevicesCount) devices(deviceNumber) = new Device("device" + (10000 + deviceNumber))
|
||||
|
||||
// Wait and disconnect
|
||||
for (deviceNumber ← 0 until DevicesCount) {
|
||||
devices(deviceNumber).waitConfirmation()
|
||||
devices(deviceNumber).disconnect()
|
||||
}
|
||||
|
||||
val connString = new ConnectionStringBuilder(
|
||||
Configuration.iotHubNamespace,
|
||||
Configuration.iotHubName,
|
||||
Configuration.accessPolicy,
|
||||
Configuration.accessKey).toString
|
||||
|
||||
log.info(s"Connecting to IoT Hub")
|
||||
val client = EventHubClient.createFromConnectionStringSync(connString)
|
||||
|
||||
var found = false
|
||||
var attempts = 0
|
||||
var p = 0
|
||||
|
||||
// Check that at least one message arrived to IoT Hub
|
||||
while (!found && p < Configuration.iotHubPartitions) {
|
||||
|
||||
log.info(s"Checking partition ${p}")
|
||||
val receiver: PartitionReceiver = client.createReceiverSync(Configuration.receiverConsumerGroup, p.toString, startTime)
|
||||
|
||||
log.info("Receiver getEpoch(): " + receiver.getEpoch)
|
||||
log.info("Receiver getPartitionId(): " + receiver.getPartitionId)
|
||||
log.info("Receiver getPrefetchCount(): " + receiver.getPrefetchCount)
|
||||
log.info("Receiver getReceiveTimeout(): " + receiver.getReceiveTimeout)
|
||||
|
||||
attempts = 0
|
||||
while (!found && attempts < 100) {
|
||||
attempts += 1
|
||||
|
||||
log.info(s"Receiving batch ${attempts}")
|
||||
val records = receiver.receiveSync(999)
|
||||
if (records == null) {
|
||||
attempts = Int.MaxValue
|
||||
log.info("This partition is empty")
|
||||
} else {
|
||||
val messages = records.asScala
|
||||
log.info(s"Messages retrieved ${messages.size}")
|
||||
|
||||
val matching = messages.filter(e ⇒ new String(e.getBody) contains testRunId)
|
||||
log.info(s"Matching messages ${matching.size}")
|
||||
|
||||
found = (matching.size > 0)
|
||||
}
|
||||
// Send a message from each device
|
||||
for (deviceNumber ← 0 until DevicesCount) {
|
||||
devices(deviceNumber).sendMessage(testRunId, 0)
|
||||
// Workaround for issue 995
|
||||
devices(deviceNumber).waitConfirmation()
|
||||
}
|
||||
|
||||
p += 1
|
||||
}
|
||||
// Wait and disconnect
|
||||
for (deviceNumber ← 0 until DevicesCount) {
|
||||
devices(deviceNumber).waitConfirmation()
|
||||
devices(deviceNumber).disconnect()
|
||||
}
|
||||
|
||||
assert(found, s"Expecting to find at least one of the messages sent")
|
||||
val connString = new ConnectionStringBuilder(
|
||||
Configuration.iotHubNamespace,
|
||||
Configuration.iotHubName,
|
||||
Configuration.accessPolicy,
|
||||
Configuration.accessKey).toString
|
||||
|
||||
log.info(s"Connecting to IoT Hub")
|
||||
val client = EventHubClient.createFromConnectionStringSync(connString)
|
||||
|
||||
var found = false
|
||||
var attempts = 0
|
||||
var p = 0
|
||||
|
||||
// Check that at least one message arrived to IoT Hub
|
||||
while (!found && p < Configuration.iotHubPartitions) {
|
||||
|
||||
log.info(s"Checking partition ${p}")
|
||||
val receiver: PartitionReceiver = client.createReceiverSync(Configuration.receiverConsumerGroup, p.toString, startTime)
|
||||
|
||||
log.info("Receiver getEpoch(): " + receiver.getEpoch)
|
||||
log.info("Receiver getPartitionId(): " + receiver.getPartitionId)
|
||||
log.info("Receiver getPrefetchCount(): " + receiver.getPrefetchCount)
|
||||
log.info("Receiver getReceiveTimeout(): " + receiver.getReceiveTimeout)
|
||||
|
||||
attempts = 0
|
||||
while (!found && attempts < 100) {
|
||||
attempts += 1
|
||||
|
||||
log.info(s"Receiving batch ${attempts}")
|
||||
val records = receiver.receiveSync(999)
|
||||
if (records == null) {
|
||||
attempts = Int.MaxValue
|
||||
log.info("This partition is empty")
|
||||
} else {
|
||||
val messages = records.asScala
|
||||
log.info(s"Messages retrieved ${messages.size}")
|
||||
|
||||
val matching = messages.filter(e ⇒ new String(e.getBody) contains testRunId)
|
||||
log.info(s"Matching messages ${matching.size}")
|
||||
|
||||
found = (matching.size > 0)
|
||||
}
|
||||
}
|
||||
|
||||
p += 1
|
||||
}
|
||||
|
||||
assert(found, s"Expecting to find at least one of the messages sent")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче