From 515c2f135df408cb846b188f4707ec5e938fa193 Mon Sep 17 00:00:00 2001 From: Varun Date: Mon, 30 Jan 2017 14:10:40 -0800 Subject: [PATCH] Addressing PR feedback and changing using sync version of ServiceClient.sync - Several minor updates to address PR feedback - Changed to use the sync version of ServiceClient.sendMessage (as the async one has some bugs) - Made some updates to the README. --- README.md | 5 +- README_Sink.md | 62 +++++++++++-------- README_Source.md | 4 +- build.sbt | 4 +- .../connect/sink/C2DMessageConverter.scala | 46 +++++++------- .../connect/sink/IotHubMessageSender.scala | 10 ++- .../kafka/connect/sink/IotHubSinkConfig.scala | 2 +- .../connect/sink/IotHubSinkConnector.scala | 3 +- .../kafka/connect/sink/IotHubSinkTask.scala | 42 +------------ .../kafka/connect/sink/MessageSender.scala | 6 +- .../connect/sink/IotHubSinkTaskTest.scala | 26 ++------ .../sink/testhelpers/MockMessageSender.scala | 23 +++---- .../sink/testhelpers/SinkTestConfig.scala | 2 +- .../sink/testhelpers/TestIotHubSinkTask.scala | 8 +-- 14 files changed, 95 insertions(+), 148 deletions(-) diff --git a/README.md b/README.md index 0285fc3..eb500a3 100755 --- a/README.md +++ b/README.md @@ -5,8 +5,9 @@ Kafka Connect Azure IoT Hub consists of 2 connectors - a source connector and a is used to pump data from [Azure IoT Hub](https://azure.microsoft.com/en-us/services/iot-hub/) to [Apache Kafka](https://kafka.apache.org/), whereas the sink connector reads messages from Kafka and sends them to IoT devices via [Azure IoT Hub](https://azure.microsoft.com/en-us/services/iot-hub/). When used in tandem, the 2 - connectors allow communicating with IoT devices by simply posting and reading messages to Kafka topics. This should - makie it easier for open source systems and other systems that already interface with Kafka to communicate with + connectors allow communicating with IoT devices by simply posting and reading messages to/from Kafka topics. This + should + make it easier for open source systems and other systems that already interface with Kafka to communicate with Azure IoT devices. For more information on the capabilities of the connectors and how to use them, please refer to the links below - diff --git a/README_Sink.md b/README_Sink.md index 87c6278..47f040c 100644 --- a/README_Sink.md +++ b/README_Sink.md @@ -3,9 +3,17 @@ ________________________ The Sink connector allows you to send messages to Azure IoT devices by simply posting messages to Kafka topics. The connector sends the messages to Azure IoT Hub, which in turn forwards them to the right devices. The -messages need to be in a specific format (details below), that allow the connector to extract the information +messages need to be in a specific format (details below), that allows the connector to extract the information necessary to send them to the right devices. +> Note: At the moment, the Sink connector only supports C2D messages, and does not support other means of communication. +> See +> [Cloud-to-device communications guidance](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-c2d-guidance) +> to see what that means. Also, the connector does not support getting feedback from the devices on the status of the +> messages (whether accepted, rejected or expired). If you want to get feedback, you will need to do it manually. +> Please refer to the documentation [here](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-java-java-c2d) on +> how to do that. + ### Configuration All the configuration settings required to run Kafka sink connector for IoT Hub are in the @@ -32,20 +40,15 @@ connector.class=com.microsoft.azure.iot.kafka.connect.sink.IotHubSinkConnector name=AzureIotHubSinkConnector tasks.max=1 topics=testtopic -IotHub.ConnectionString=HostName=Test.azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=aBCdeBb5HffTTs/J9ikdcqab1JNMB0ot= +IotHub.ConnectionString=HostName=Test.azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=aBCdeBsdfwfTTs/isuwselskab1Jksjdsot= IotHub.MessageDeliveryAcknowledgement=None ``` -> Note: At the moment, the Sink connector does not support getting feedback from the devices on the status of the -> messages (whether accepted or rejected). If you want to get feedback, you will need to do it manually. Please refer -> to the documentation [here](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-java-java-c2d) on how to do that. - - ### Data format The sink connector expects the messages in the Kafka topic to have the schema below, so that it can extract the information required to send the message to the IoT device. If a record is not in the expected format, the connector -will throw an exception. +will throw an exception, and will have to be re-started manually. ```json { @@ -107,11 +110,18 @@ use the right Azure IoT Hub and Kafka settings. For more details on using Kafka * You also need to have Apache Kafka v0.10 installation running, that contains messages to be sent to the IoT devices in one or more topics. Get started with Kafka [here](http://docs.confluent.io/3.0.0/quickstart.html). - The steps to insert messages in Kafka topics and to run the Sink connector depend on whether you are using the - [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs/) along with the Confluent platform. So please - follow the right set of steps below depending on your choice. +#### Steps -#### Steps when using Schema Registry +Here are the steps to run the Kafka Connect IoT Hub Sink Connector in +[standalone mode](http://docs.confluent.io/3.0.0/connect/userguide.html#standalone-worker-configuration). For +[distributed mode](http://docs.confluent.io/3.0.0/connect/userguide.html#distributed-worker-configuration), the +connector configuration will stay the same. + +The steps to insert messages in Kafka topics and to run the Sink connector depend on whether you are using the +[Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs/) along with the Confluent platform. So please +follow the right set of steps below depending on your choice. + +##### Steps when using Schema Registry When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs/) along with the [Confluent Platform](http://docs.confluent.io/3.0.0/platform.html), messages are inserted in Kafka topic as Avro records @@ -131,8 +141,13 @@ When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs values as described in the section above. Binplace the file "connect-iothub-sink.properties" in the Kafka installation config folder (usually under KAFKA_HOME/etc). - 4. Update the Kafka Connect configuration file ("etc/kafka/connect-standalone.properties" or - "etc/kafka/connect-distributed.properties") to point to the Kafka bootstrap servers. + 4. Make the following updates to the Kafka Connect configuration file (typically "etc/kafka/connect-standalone.properties)" - + * Update bootstrap.servers to point to the Kafka server. + * Add the following setting to the file. This will make sure that the Kafka sink connector handles only 10 records + at a time, preventing Kafka connect from timing out the operation. + ``` + consumer.max.poll.records=10 + ``` 5. Make sure Kafka server, Zookeeper, and Schema Registry are running, as described [here](http://docs.confluent.io/3.0.0/quickstart.html) @@ -144,10 +159,6 @@ When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs ``` bin/connect-standalone.sh config/connect-standalone.properties config/connect-iothub-sink.properties ``` - For distributed mode, the connector configuration will stay the same. For the detailed steps on how to do this, please - follow the - [Confluent Kafka Connect documentation](http://docs.confluent.io/3.0.0/connect/userguide.html#distributed-worker-configuration) - on this topic. 7. Insert messages to be sent to the IoT devices in the Kafka topic as Avro records. One way you can do that is using a [KafkaProducer](http://docs.confluent.io/3.0.0/clients/producer.html). Here is some sample code to send such messages to @@ -181,7 +192,7 @@ When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs producer.send(producerRecord) ``` -#### Steps when not using Schema Registry +##### Steps when not using Schema Registry If you are using the standard Apache Kakfa without the Schema Registry integration, messages are inserted in Kafka topics as JSON strings, which are then deserialized by the Sink connector. @@ -200,11 +211,15 @@ Alternatively, you can directly download the jar file for Kafka Connect IoT Hub values as described in the section above. Binplace the file "connect-iothub-sink.properties" in the Kafka installation config folder (usually under KAFKA_HOME/config). -4. Make the following updates to the Kafka Connect configuration file ("config/connect-standalone.properties" or -"config/connect-distributed.properties") - +4. Make the following updates to the Kafka Connect configuration file (typically "config/connect-standalone.properties") - * Update bootstrap.servers to point to the Kafka server. * Update key.converter and value.converter to use org.apache.kafka.connect.storage.StringConverter (instead of the default org.apache.kafka.connect.json.JsonConverter) + * Add the following setting to the file. This will make sure that the Kafka sink connector handles only 10 records + at a time, preventing Kafka connect from timing out the operation. +``` +consumer.max.poll.records=10 +``` 5. Make sure Kafka server and Zookeeper are running, as described [here](https://kafka.apache.org/documentation#quickstart) @@ -215,10 +230,6 @@ messages from Kafka topic and send them to the IoT Devices - ``` bin/connect-standalone.sh config/connect-standalone.properties config/connect-iothub-sink.properties ``` -For distributed mode, the connector configuration will stay the same. For the detailed steps on how to do this, please -follow the -[Confluent Kafka Connect documentation](http://docs.confluent.io/3.0.0/connect/userguide.html#distributed-worker-configuration) -on this topic. 7. Insert messages to be sent to the IoT devices in the Kafka topic as JSON strings. One way you can do that is using a [KafkaProducer](https://kafka.apache.org/documentation/#producerapi). Here is some sample code to send such messages to @@ -243,3 +254,4 @@ on this topic. ## Future work * Add support to get feedback on the messages sent to the Azure IoT Devices. +* Add support for other means of communication from cloud-to-device (for e.g. direct methods) diff --git a/README_Source.md b/README_Source.md index f6a0700..d4b40d7 100644 --- a/README_Source.md +++ b/README_Source.md @@ -41,9 +41,9 @@ name=AzureIotHubConnector tasks.max=1 Kafka.Topic=IotTopic IotHub.EventHubCompatibleName=iothub-toketi -IotHub.EventHubCompatibleEndpoint=sb://iothub-ns-toketi-001.servicebus.windows.net/ +IotHub.EventHubCompatibleEndpoint=sb://iothub-001.servicebus.windows.net/ IotHub.AccessKeyName=service -IotHub.AccessKeyValue=6XdRSFB9H61f+N3uOdBJiKwzeqbZUj1K//T2jFyewN4= +IotHub.AccessKeyValue=4KsdfiB9J899a+N3iwerjKwzeqbZUj1K//KKj1ye9i3= IotHub.ConsumerGroup=$Default IotHub.Partitions=4 IotHub.StartTime=2016-11-28T00:00:00Z diff --git a/build.sbt b/build.sbt index ac13b39..d8b4050 100755 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,7 @@ libraryDependencies ++= { val scalaTestVersion = "3.0.0" val configVersion = "1.3.1" val json4sVersion = "3.5.0" - val iotHubServiceClientVersion = "1.0.11" + val iotHubServiceClientVersion = "1.0.12" Seq( "org.apache.kafka" % "connect-api" % kafkaVersion % "provided", @@ -28,7 +28,7 @@ libraryDependencies ++= { "com.microsoft.azure" % "azure-eventhubs" % azureEventHubSDKVersion, "org.json4s" %% "json4s-jackson" % json4sVersion, "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion, - "com.microsoft.azure.iothub-java-client" % "iothub-java-service-client" % iotHubServiceClientVersion, + "com.microsoft.azure.sdk.iot" % "iot-service-client" % iotHubServiceClientVersion, // Test dependencies "org.scalatest" %% "scalatest" % scalaTestVersion % "test", diff --git a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/C2DMessageConverter.scala b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/C2DMessageConverter.scala index bb4dfca..6b1b141 100644 --- a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/C2DMessageConverter.scala +++ b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/C2DMessageConverter.scala @@ -57,6 +57,29 @@ object C2DMessageConverter extends JsonSerialization { } + // Public for testing purposes + def validateStructSchema(schema: Schema): Unit = { + if (schema.`type`() != expectedSchema.`type`()) { + throw new ConnectException(s"Schema of Kafka record is of type ${schema.`type`().toString}, while expected " + + s"schema of type ${expectedSchema.`type`().toString}") + } + + for (expectedField ← expectedSchema.fields().asScala) { + val field = schema.field(expectedField.name()) + if (field != null) { + val expectedFieldSchema = expectedField.schema() + val fieldSchema = field.schema() + if (fieldSchema.`type`() != expectedFieldSchema.`type`()) { + throw new ConnectException(s"Schema type of Kafka record field ${field.name()} - ${fieldSchema.`type`()} " + + s"does not match the expected schema type ${expectedFieldSchema.`type`()}") + } + } + else if (!expectedField.schema().isOptional) { + throw new ConnectException(s"Schema of Kafka record does not contain required field ${expectedField.name()}") + } + } + } + private def validateStructSchemaAndGetMessage(record: SinkRecord, schema: Schema): C2DMessage = { validateStructSchema(schema) @@ -84,27 +107,4 @@ object C2DMessageConverter extends JsonSerialization { } None } - - // Public for testing purposes - def validateStructSchema(schema: Schema): Unit = { - if (schema.`type`() != expectedSchema.`type`()) { - throw new ConnectException(s"Schema of Kafka record is of type ${schema.`type`().toString}, while expected " + - s"schema of type ${expectedSchema.`type`().toString}") - } - - for (expectedField ← expectedSchema.fields().asScala) { - val field = schema.field(expectedField.name()) - if (field != null) { - val expectedFieldSchema = expectedField.schema() - val fieldSchema = field.schema() - if (fieldSchema.`type`() != expectedFieldSchema.`type`()) { - throw new ConnectException(s"Schema type of Kafka record field ${field.name()} - ${fieldSchema.`type`()} " + - s"does not match the expected schema type ${expectedFieldSchema.`type`()}") - } - } - else if (!expectedField.schema().isOptional) { - throw new ConnectException(s"Schema of Kafka record does not contain required field ${expectedField.name()}") - } - } - } } diff --git a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubMessageSender.scala b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubMessageSender.scala index 8681152..f2db3b1 100644 --- a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubMessageSender.scala +++ b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubMessageSender.scala @@ -4,9 +4,7 @@ package com.microsoft.azure.iot.kafka.connect.sink -import java.util.concurrent.CompletableFuture - -import com.microsoft.azure.iot.service.sdk.{IotHubServiceClientProtocol, Message, ServiceClient} +import com.microsoft.azure.sdk.iot.service.sdk.{IotHubServiceClientProtocol, Message, ServiceClient} class IotHubMessageSender(connectionString: String) extends MessageSender { @@ -14,11 +12,11 @@ class IotHubMessageSender(connectionString: String) extends MessageSender { IotHubServiceClientProtocol.AMQPS) this.serviceClient.open() - def sendMessage(deviceId: String, message: Message): CompletableFuture[Void] = { - this.serviceClient.sendAsync(deviceId, message) + def sendMessage(deviceId: String, message: Message): Unit = { + this.serviceClient.send(deviceId, message) } def close(): Unit = { - this.serviceClient.closeAsync() + this.serviceClient.close() } } diff --git a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkConfig.scala b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkConfig.scala index dff8e2a..3cbdb1e 100644 --- a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkConfig.scala +++ b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkConfig.scala @@ -6,7 +6,7 @@ package com.microsoft.azure.iot.kafka.connect.sink import java.util.Map -import com.microsoft.azure.iot.service.sdk.DeliveryAcknowledgement +import com.microsoft.azure.sdk.iot.service.sdk.DeliveryAcknowledgement import org.apache.kafka.common.config.ConfigDef.{Importance, Type, Width} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} diff --git a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkConnector.scala b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkConnector.scala index 01b4598..784316d 100644 --- a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkConnector.scala +++ b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkConnector.scala @@ -22,8 +22,7 @@ class IotHubSinkConnector extends SinkConnector with LazyLogging with JsonSerial override def taskClass(): Class[_ <: Task] = classOf[IotHubSinkTask] override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = { - val configList = (1 to maxTasks).map(c => this.props.asJava).toList.asJava - configList + (1 to maxTasks).map(c => this.props.asJava).toList.asJava } override def stop(): Unit = { diff --git a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkTask.scala b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkTask.scala index d9f29ad..2a0a1a7 100644 --- a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkTask.scala +++ b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkTask.scala @@ -5,21 +5,15 @@ package com.microsoft.azure.iot.kafka.connect.sink import java.util -import java.util.concurrent.CompletableFuture import com.microsoft.azure.iot.kafka.connect.JsonSerialization -import com.microsoft.azure.iot.service.sdk.{DeliveryAcknowledgement, Message} +import com.microsoft.azure.sdk.iot.service.sdk.{DeliveryAcknowledgement, Message} import com.typesafe.scalalogging.LazyLogging import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import org.apache.kafka.connect.sink.{SinkRecord, SinkTask} import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{Await, Future, TimeoutException} -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization { @@ -27,14 +21,12 @@ class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization { protected var messageSender : Option[MessageSender] = None protected var acknowledgement : DeliveryAcknowledgement = DeliveryAcknowledgement.None private[this] var isClosing : Boolean = false - private[this] var sendMessageFutures: ArrayBuffer[CompletableFuture[Void]] = mutable.ArrayBuffer.empty[CompletableFuture[Void]] override def stop(): Unit = { logger.info("Stopping IotHubSink Task") if (this.messageSender.isDefined && !this.isClosing) { this.messageSender.synchronized { if (!this.isClosing) { - this.waitForAllMessages() this.isClosing = true logger.info("Closing IotHub clients") this.messageSender.get.close() @@ -69,38 +61,10 @@ class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization { if (c2DMessage.expiryTime.isDefined) { message.setExpiryTimeUtc(c2DMessage.expiryTime.get) } - logger.debug(s"Sending Message to Device - $message") - this.sendMessageFutures += this.messageSender.get.sendMessage(c2DMessage.deviceId, message) + this.messageSender.get.sendMessage(c2DMessage.deviceId, message) } - override def flush(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = { - logger.info("Flushing IotHubSink Task") - this.waitForAllMessages() - } - - // Wait for pending tasks to complete. If some of the tasks take too long, cancel the remaining tasks to avoid Kafka - // Connect from timing out (default timeout is 30 seconds). - private def waitForAllMessages(): Unit = { - logger.info("Waiting for all send message tasks to complete") - - try { - val waitJob = Future { - CompletableFuture.allOf(this.sendMessageFutures: _*).join() - } - Await.result(waitJob, 20.seconds) - } catch { - case tex: TimeoutException ⇒ { - val completedFutures = this.sendMessageFutures.count(f ⇒ f.isDone) - val pendingFutures = this.sendMessageFutures.size - completedFutures - - logger.error("Got timeout exception while waiting for send message tasks. Ignoring the pending tasks to avoid" + - " Kakfa Connect from timing out. " + - s"There are $completedFutures completed tasks and $pendingFutures pending tasks.") - } - } - this.sendMessageFutures = mutable.ArrayBuffer.empty[CompletableFuture[Void]] - logger.info(s"Done waiting for all send message tasks") - } + override def flush(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = {} override def start(props: util.Map[String, String]): Unit = { logger.info("Starting IotHub Sink") diff --git a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/MessageSender.scala b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/MessageSender.scala index 42b8fd3..88027d4 100644 --- a/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/MessageSender.scala +++ b/src/main/scala/com/microsoft/azure/iot/kafka/connect/sink/MessageSender.scala @@ -4,12 +4,10 @@ package com.microsoft.azure.iot.kafka.connect.sink -import java.util.concurrent.CompletableFuture - -import com.microsoft.azure.iot.service.sdk.Message +import com.microsoft.azure.sdk.iot.service.sdk.Message trait MessageSender { - def sendMessage(deviceId: String, message: Message): CompletableFuture[Void] + def sendMessage(deviceId: String, message: Message) def close() } diff --git a/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkTaskTest.scala b/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkTaskTest.scala index 8a9e414..0703903 100644 --- a/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkTaskTest.scala +++ b/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/IotHubSinkTaskTest.scala @@ -8,7 +8,7 @@ import java.util.function._ import com.microsoft.azure.iot.kafka.connect.JsonSerialization import com.microsoft.azure.iot.kafka.connect.sink.testhelpers.{SinkTestConfig, TestIotHubSinkTask, TestSinkRecords} -import com.microsoft.azure.iot.service.sdk.DeliveryAcknowledgement +import com.microsoft.azure.sdk.iot.service.sdk.DeliveryAcknowledgement import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.SinkRecord @@ -42,18 +42,10 @@ class IotHubSinkTaskTest extends FlatSpec with GivenWhenThen with JsonSerializat Then("It sends the records to the right destination") Thread.sleep(1000) - var sentRecords = iotHubSinkTask.getSentMessages() + val sentRecords = iotHubSinkTask.getSentMessages() - When("Not all records have been sent and flush is called") - assert(sentRecords.size < sinkRecords.size()) - iotHubSinkTask.flush(null) - - Then("All records are sent before returning, and all messages contain all the right information") - sentRecords = iotHubSinkTask.getSentMessages() assert(sentRecords.size == sinkRecords.size()) - val sentRecordsIterator = sentRecords.iterator() - while (sentRecordsIterator.hasNext) { - val sentRecord = sentRecordsIterator.next() + for (sentRecord ← sentRecords) { val predicate = (r: SinkRecord) ⇒ r.value().asInstanceOf[Struct].getString("messageId") == sentRecord.getMessageId val sinkRecord = sinkRecords.stream().filter(predicate).findAny() assert(sinkRecord != null && sinkRecord.isPresent) @@ -74,18 +66,10 @@ class IotHubSinkTaskTest extends FlatSpec with GivenWhenThen with JsonSerializat Then("It sends the records to the right destination") Thread.sleep(1000) - var sentRecords = iotHubSinkTask.getSentMessages() + val sentRecords = iotHubSinkTask.getSentMessages() - When("Not all records have been sent and flush is called") - assert(sentRecords.size < sinkRecords.size()) - iotHubSinkTask.flush(null) - - Then("All records are sent before returning, and all messages contain all the right information") - sentRecords = iotHubSinkTask.getSentMessages() assert(sentRecords.size == sinkRecords.size()) - val sentRecordsIterator = sentRecords.iterator() - while (sentRecordsIterator.hasNext) { - val sentRecord = sentRecordsIterator.next() + for (sentRecord ← sentRecords) { val predicate = (r: SinkRecord) ⇒ r.value().asInstanceOf[String].contains(sentRecord.getMessageId) val sinkRecord = sinkRecords.stream().filter(predicate).findAny() assert(sinkRecord != null && sinkRecord.isPresent) diff --git a/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/MockMessageSender.scala b/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/MockMessageSender.scala index dc39e1f..3e617e8 100644 --- a/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/MockMessageSender.scala +++ b/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/MockMessageSender.scala @@ -4,26 +4,17 @@ package com.microsoft.azure.iot.kafka.connect.sink.testhelpers -import java.util -import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, ExecutorService, Executors} - import com.microsoft.azure.iot.kafka.connect.sink.MessageSender -import com.microsoft.azure.iot.service.sdk.Message +import com.microsoft.azure.sdk.iot.service.sdk.Message + +import scala.collection.mutable.ArrayBuffer class MockMessageSender(connectionString: String) extends MessageSender { - private val executor: ExecutorService = Executors.newFixedThreadPool(5) - private var messageList: Option[ConcurrentLinkedQueue[Message]] = Some(new ConcurrentLinkedQueue[Message]) + private var messageList: Option[ArrayBuffer[Message]] = Some(ArrayBuffer.empty[Message]) - override def sendMessage(deviceId: String, message: Message): CompletableFuture[Void] = { - val messageListValue = messageList.get - val future = CompletableFuture.runAsync(new Runnable { - override def run(): Unit = { - Thread.sleep(2000) - messageListValue.add(message) - } - }, executor) - future + override def sendMessage(deviceId: String, message: Message): Unit = { + messageList.get += message } override def close(): Unit = { @@ -31,5 +22,5 @@ class MockMessageSender(connectionString: String) extends MessageSender { messageList = None } - def getSentMessages(): util.Collection[Message] = messageList.get + def getSentMessages(): ArrayBuffer[Message] = messageList.get } \ No newline at end of file diff --git a/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/SinkTestConfig.scala b/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/SinkTestConfig.scala index 18415db..ffb65e4 100644 --- a/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/SinkTestConfig.scala +++ b/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/SinkTestConfig.scala @@ -7,7 +7,7 @@ package com.microsoft.azure.iot.kafka.connect.sink.testhelpers import java.util import com.microsoft.azure.iot.kafka.connect.sink.IotHubSinkConfig -import com.microsoft.azure.iot.service.sdk.DeliveryAcknowledgement +import com.microsoft.azure.sdk.iot.service.sdk.DeliveryAcknowledgement import com.typesafe.config.ConfigFactory object SinkTestConfig { diff --git a/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/TestIotHubSinkTask.scala b/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/TestIotHubSinkTask.scala index 4c040ad..228a63a 100644 --- a/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/TestIotHubSinkTask.scala +++ b/src/test/scala/com/microsoft/azure/iot/kafka/connect/sink/testhelpers/TestIotHubSinkTask.scala @@ -4,14 +4,14 @@ package com.microsoft.azure.iot.kafka.connect.sink.testhelpers -import java.util - import com.microsoft.azure.iot.kafka.connect.sink.{IotHubSinkTask, MessageSender} -import com.microsoft.azure.iot.service.sdk.{DeliveryAcknowledgement, Message} +import com.microsoft.azure.sdk.iot.service.sdk.{DeliveryAcknowledgement, Message} + +import scala.collection.mutable.ArrayBuffer class TestIotHubSinkTask extends IotHubSinkTask { - def getSentMessages(): util.Collection[Message] = this.messageSender.get.asInstanceOf[MockMessageSender].getSentMessages() + def getSentMessages(): ArrayBuffer[Message] = this.messageSender.get.asInstanceOf[MockMessageSender].getSentMessages() def getDeliveryAcknowledgement(): DeliveryAcknowledgement = this.acknowledgement