Addressing PR feedback and changing using sync version of ServiceClient.sync

- Several minor updates to address PR feedback
- Changed to use the sync version of ServiceClient.sendMessage (as the
async one has some bugs)
- Made some updates to the README.
This commit is contained in:
Varun 2017-01-30 14:10:40 -08:00
Родитель 67a70e6fb3
Коммит 515c2f135d
14 изменённых файлов: 95 добавлений и 148 удалений

Просмотреть файл

@ -5,8 +5,9 @@ Kafka Connect Azure IoT Hub consists of 2 connectors - a source connector and a
is used to pump data from [Azure IoT Hub](https://azure.microsoft.com/en-us/services/iot-hub/) to
[Apache Kafka](https://kafka.apache.org/), whereas the sink connector reads messages from Kafka and sends them to IoT
devices via [Azure IoT Hub](https://azure.microsoft.com/en-us/services/iot-hub/). When used in tandem, the 2
connectors allow communicating with IoT devices by simply posting and reading messages to Kafka topics. This should
makie it easier for open source systems and other systems that already interface with Kafka to communicate with
connectors allow communicating with IoT devices by simply posting and reading messages to/from Kafka topics. This
should
make it easier for open source systems and other systems that already interface with Kafka to communicate with
Azure IoT devices.
For more information on the capabilities of the connectors and how to use them, please refer to the links below -

Просмотреть файл

@ -3,9 +3,17 @@ ________________________
The Sink connector allows you to send messages to Azure IoT devices by simply posting messages to Kafka topics. The
connector sends the messages to Azure IoT Hub, which in turn forwards them to the right devices. The
messages need to be in a specific format (details below), that allow the connector to extract the information
messages need to be in a specific format (details below), that allows the connector to extract the information
necessary to send them to the right devices.
> Note: At the moment, the Sink connector only supports C2D messages, and does not support other means of communication.
> See
> [Cloud-to-device communications guidance](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-c2d-guidance)
> to see what that means. Also, the connector does not support getting feedback from the devices on the status of the
> messages (whether accepted, rejected or expired). If you want to get feedback, you will need to do it manually.
> Please refer to the documentation [here](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-java-java-c2d) on
> how to do that.
### Configuration
All the configuration settings required to run Kafka sink connector for IoT Hub are in the
@ -32,20 +40,15 @@ connector.class=com.microsoft.azure.iot.kafka.connect.sink.IotHubSinkConnector
name=AzureIotHubSinkConnector
tasks.max=1
topics=testtopic
IotHub.ConnectionString=HostName=Test.azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=aBCdeBb5HffTTs/J9ikdcqab1JNMB0ot=
IotHub.ConnectionString=HostName=Test.azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=aBCdeBsdfwfTTs/isuwselskab1Jksjdsot=
IotHub.MessageDeliveryAcknowledgement=None
```
> Note: At the moment, the Sink connector does not support getting feedback from the devices on the status of the
> messages (whether accepted or rejected). If you want to get feedback, you will need to do it manually. Please refer
> to the documentation [here](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-java-java-c2d) on how to do that.
### Data format
The sink connector expects the messages in the Kafka topic to have the schema below, so that it can extract the
information required to send the message to the IoT device. If a record is not in the expected format, the connector
will throw an exception.
will throw an exception, and will have to be re-started manually.
```json
{
@ -107,11 +110,18 @@ use the right Azure IoT Hub and Kafka settings. For more details on using Kafka
* You also need to have Apache Kafka v0.10 installation running, that contains messages to be sent to the IoT devices in
one or more topics. Get started with Kafka [here](http://docs.confluent.io/3.0.0/quickstart.html).
The steps to insert messages in Kafka topics and to run the Sink connector depend on whether you are using the
[Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs/) along with the Confluent platform. So please
follow the right set of steps below depending on your choice.
#### Steps
#### Steps when using Schema Registry
Here are the steps to run the Kafka Connect IoT Hub Sink Connector in
[standalone mode](http://docs.confluent.io/3.0.0/connect/userguide.html#standalone-worker-configuration). For
[distributed mode](http://docs.confluent.io/3.0.0/connect/userguide.html#distributed-worker-configuration), the
connector configuration will stay the same.
The steps to insert messages in Kafka topics and to run the Sink connector depend on whether you are using the
[Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs/) along with the Confluent platform. So please
follow the right set of steps below depending on your choice.
##### Steps when using Schema Registry
When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs/) along with the
[Confluent Platform](http://docs.confluent.io/3.0.0/platform.html), messages are inserted in Kafka topic as Avro records
@ -131,8 +141,13 @@ When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs
values as described in the section above. Binplace the file "connect-iothub-sink.properties" in the Kafka
installation config folder (usually under KAFKA_HOME/etc).
4. Update the Kafka Connect configuration file ("etc/kafka/connect-standalone.properties" or
"etc/kafka/connect-distributed.properties") to point to the Kafka bootstrap servers.
4. Make the following updates to the Kafka Connect configuration file (typically "etc/kafka/connect-standalone.properties)" -
* Update bootstrap.servers to point to the Kafka server.
* Add the following setting to the file. This will make sure that the Kafka sink connector handles only 10 records
at a time, preventing Kafka connect from timing out the operation.
```
consumer.max.poll.records=10
```
5. Make sure Kafka server, Zookeeper, and Schema Registry are running, as described
[here](http://docs.confluent.io/3.0.0/quickstart.html)
@ -144,10 +159,6 @@ When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs
```
bin/connect-standalone.sh config/connect-standalone.properties config/connect-iothub-sink.properties
```
For distributed mode, the connector configuration will stay the same. For the detailed steps on how to do this, please
follow the
[Confluent Kafka Connect documentation](http://docs.confluent.io/3.0.0/connect/userguide.html#distributed-worker-configuration)
on this topic.
7. Insert messages to be sent to the IoT devices in the Kafka topic as Avro records. One way you can do that is using a
[KafkaProducer](http://docs.confluent.io/3.0.0/clients/producer.html). Here is some sample code to send such messages to
@ -181,7 +192,7 @@ When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs
producer.send(producerRecord)
```
#### Steps when not using Schema Registry
##### Steps when not using Schema Registry
If you are using the standard Apache Kakfa without the Schema Registry integration, messages are inserted in Kafka
topics as JSON strings, which are then deserialized by the Sink connector.
@ -200,11 +211,15 @@ Alternatively, you can directly download the jar file for Kafka Connect IoT Hub
values as described in the section above. Binplace the file "connect-iothub-sink.properties" in the Kafka
installation config folder (usually under KAFKA_HOME/config).
4. Make the following updates to the Kafka Connect configuration file ("config/connect-standalone.properties" or
"config/connect-distributed.properties") -
4. Make the following updates to the Kafka Connect configuration file (typically "config/connect-standalone.properties") -
* Update bootstrap.servers to point to the Kafka server.
* Update key.converter and value.converter to use org.apache.kafka.connect.storage.StringConverter (instead of the
default org.apache.kafka.connect.json.JsonConverter)
* Add the following setting to the file. This will make sure that the Kafka sink connector handles only 10 records
at a time, preventing Kafka connect from timing out the operation.
```
consumer.max.poll.records=10
```
5. Make sure Kafka server and Zookeeper are running, as described [here](https://kafka.apache.org/documentation#quickstart)
@ -215,10 +230,6 @@ messages from Kafka topic and send them to the IoT Devices -
```
bin/connect-standalone.sh config/connect-standalone.properties config/connect-iothub-sink.properties
```
For distributed mode, the connector configuration will stay the same. For the detailed steps on how to do this, please
follow the
[Confluent Kafka Connect documentation](http://docs.confluent.io/3.0.0/connect/userguide.html#distributed-worker-configuration)
on this topic.
7. Insert messages to be sent to the IoT devices in the Kafka topic as JSON strings. One way you can do that is using a
[KafkaProducer](https://kafka.apache.org/documentation/#producerapi). Here is some sample code to send such messages to
@ -243,3 +254,4 @@ on this topic.
## Future work
* Add support to get feedback on the messages sent to the Azure IoT Devices.
* Add support for other means of communication from cloud-to-device (for e.g. direct methods)

Просмотреть файл

@ -41,9 +41,9 @@ name=AzureIotHubConnector
tasks.max=1
Kafka.Topic=IotTopic
IotHub.EventHubCompatibleName=iothub-toketi
IotHub.EventHubCompatibleEndpoint=sb://iothub-ns-toketi-001.servicebus.windows.net/
IotHub.EventHubCompatibleEndpoint=sb://iothub-001.servicebus.windows.net/
IotHub.AccessKeyName=service
IotHub.AccessKeyValue=6XdRSFB9H61f+N3uOdBJiKwzeqbZUj1K//T2jFyewN4=
IotHub.AccessKeyValue=4KsdfiB9J899a+N3iwerjKwzeqbZUj1K//KKj1ye9i3=
IotHub.ConsumerGroup=$Default
IotHub.Partitions=4
IotHub.StartTime=2016-11-28T00:00:00Z

Просмотреть файл

@ -19,7 +19,7 @@ libraryDependencies ++= {
val scalaTestVersion = "3.0.0"
val configVersion = "1.3.1"
val json4sVersion = "3.5.0"
val iotHubServiceClientVersion = "1.0.11"
val iotHubServiceClientVersion = "1.0.12"
Seq(
"org.apache.kafka" % "connect-api" % kafkaVersion % "provided",
@ -28,7 +28,7 @@ libraryDependencies ++= {
"com.microsoft.azure" % "azure-eventhubs" % azureEventHubSDKVersion,
"org.json4s" %% "json4s-jackson" % json4sVersion,
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion,
"com.microsoft.azure.iothub-java-client" % "iothub-java-service-client" % iotHubServiceClientVersion,
"com.microsoft.azure.sdk.iot" % "iot-service-client" % iotHubServiceClientVersion,
// Test dependencies
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",

Просмотреть файл

@ -57,6 +57,29 @@ object C2DMessageConverter extends JsonSerialization {
}
// Public for testing purposes
def validateStructSchema(schema: Schema): Unit = {
if (schema.`type`() != expectedSchema.`type`()) {
throw new ConnectException(s"Schema of Kafka record is of type ${schema.`type`().toString}, while expected " +
s"schema of type ${expectedSchema.`type`().toString}")
}
for (expectedField expectedSchema.fields().asScala) {
val field = schema.field(expectedField.name())
if (field != null) {
val expectedFieldSchema = expectedField.schema()
val fieldSchema = field.schema()
if (fieldSchema.`type`() != expectedFieldSchema.`type`()) {
throw new ConnectException(s"Schema type of Kafka record field ${field.name()} - ${fieldSchema.`type`()} " +
s"does not match the expected schema type ${expectedFieldSchema.`type`()}")
}
}
else if (!expectedField.schema().isOptional) {
throw new ConnectException(s"Schema of Kafka record does not contain required field ${expectedField.name()}")
}
}
}
private def validateStructSchemaAndGetMessage(record: SinkRecord, schema: Schema): C2DMessage = {
validateStructSchema(schema)
@ -84,27 +107,4 @@ object C2DMessageConverter extends JsonSerialization {
}
None
}
// Public for testing purposes
def validateStructSchema(schema: Schema): Unit = {
if (schema.`type`() != expectedSchema.`type`()) {
throw new ConnectException(s"Schema of Kafka record is of type ${schema.`type`().toString}, while expected " +
s"schema of type ${expectedSchema.`type`().toString}")
}
for (expectedField expectedSchema.fields().asScala) {
val field = schema.field(expectedField.name())
if (field != null) {
val expectedFieldSchema = expectedField.schema()
val fieldSchema = field.schema()
if (fieldSchema.`type`() != expectedFieldSchema.`type`()) {
throw new ConnectException(s"Schema type of Kafka record field ${field.name()} - ${fieldSchema.`type`()} " +
s"does not match the expected schema type ${expectedFieldSchema.`type`()}")
}
}
else if (!expectedField.schema().isOptional) {
throw new ConnectException(s"Schema of Kafka record does not contain required field ${expectedField.name()}")
}
}
}
}

Просмотреть файл

@ -4,9 +4,7 @@
package com.microsoft.azure.iot.kafka.connect.sink
import java.util.concurrent.CompletableFuture
import com.microsoft.azure.iot.service.sdk.{IotHubServiceClientProtocol, Message, ServiceClient}
import com.microsoft.azure.sdk.iot.service.sdk.{IotHubServiceClientProtocol, Message, ServiceClient}
class IotHubMessageSender(connectionString: String) extends MessageSender {
@ -14,11 +12,11 @@ class IotHubMessageSender(connectionString: String) extends MessageSender {
IotHubServiceClientProtocol.AMQPS)
this.serviceClient.open()
def sendMessage(deviceId: String, message: Message): CompletableFuture[Void] = {
this.serviceClient.sendAsync(deviceId, message)
def sendMessage(deviceId: String, message: Message): Unit = {
this.serviceClient.send(deviceId, message)
}
def close(): Unit = {
this.serviceClient.closeAsync()
this.serviceClient.close()
}
}

Просмотреть файл

@ -6,7 +6,7 @@ package com.microsoft.azure.iot.kafka.connect.sink
import java.util.Map
import com.microsoft.azure.iot.service.sdk.DeliveryAcknowledgement
import com.microsoft.azure.sdk.iot.service.sdk.DeliveryAcknowledgement
import org.apache.kafka.common.config.ConfigDef.{Importance, Type, Width}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}

Просмотреть файл

@ -22,8 +22,7 @@ class IotHubSinkConnector extends SinkConnector with LazyLogging with JsonSerial
override def taskClass(): Class[_ <: Task] = classOf[IotHubSinkTask]
override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
val configList = (1 to maxTasks).map(c => this.props.asJava).toList.asJava
configList
(1 to maxTasks).map(c => this.props.asJava).toList.asJava
}
override def stop(): Unit = {

Просмотреть файл

@ -5,21 +5,15 @@
package com.microsoft.azure.iot.kafka.connect.sink
import java.util
import java.util.concurrent.CompletableFuture
import com.microsoft.azure.iot.kafka.connect.JsonSerialization
import com.microsoft.azure.iot.service.sdk.{DeliveryAcknowledgement, Message}
import com.microsoft.azure.sdk.iot.service.sdk.{DeliveryAcknowledgement, Message}
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.connect.sink.{SinkRecord, SinkTask}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, Future, TimeoutException}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization {
@ -27,14 +21,12 @@ class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization {
protected var messageSender : Option[MessageSender] = None
protected var acknowledgement : DeliveryAcknowledgement = DeliveryAcknowledgement.None
private[this] var isClosing : Boolean = false
private[this] var sendMessageFutures: ArrayBuffer[CompletableFuture[Void]] = mutable.ArrayBuffer.empty[CompletableFuture[Void]]
override def stop(): Unit = {
logger.info("Stopping IotHubSink Task")
if (this.messageSender.isDefined && !this.isClosing) {
this.messageSender.synchronized {
if (!this.isClosing) {
this.waitForAllMessages()
this.isClosing = true
logger.info("Closing IotHub clients")
this.messageSender.get.close()
@ -69,38 +61,10 @@ class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization {
if (c2DMessage.expiryTime.isDefined) {
message.setExpiryTimeUtc(c2DMessage.expiryTime.get)
}
logger.debug(s"Sending Message to Device - $message")
this.sendMessageFutures += this.messageSender.get.sendMessage(c2DMessage.deviceId, message)
this.messageSender.get.sendMessage(c2DMessage.deviceId, message)
}
override def flush(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
logger.info("Flushing IotHubSink Task")
this.waitForAllMessages()
}
// Wait for pending tasks to complete. If some of the tasks take too long, cancel the remaining tasks to avoid Kafka
// Connect from timing out (default timeout is 30 seconds).
private def waitForAllMessages(): Unit = {
logger.info("Waiting for all send message tasks to complete")
try {
val waitJob = Future {
CompletableFuture.allOf(this.sendMessageFutures: _*).join()
}
Await.result(waitJob, 20.seconds)
} catch {
case tex: TimeoutException {
val completedFutures = this.sendMessageFutures.count(f f.isDone)
val pendingFutures = this.sendMessageFutures.size - completedFutures
logger.error("Got timeout exception while waiting for send message tasks. Ignoring the pending tasks to avoid" +
" Kakfa Connect from timing out. " +
s"There are $completedFutures completed tasks and $pendingFutures pending tasks.")
}
}
this.sendMessageFutures = mutable.ArrayBuffer.empty[CompletableFuture[Void]]
logger.info(s"Done waiting for all send message tasks")
}
override def flush(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = {}
override def start(props: util.Map[String, String]): Unit = {
logger.info("Starting IotHub Sink")

Просмотреть файл

@ -4,12 +4,10 @@
package com.microsoft.azure.iot.kafka.connect.sink
import java.util.concurrent.CompletableFuture
import com.microsoft.azure.iot.service.sdk.Message
import com.microsoft.azure.sdk.iot.service.sdk.Message
trait MessageSender {
def sendMessage(deviceId: String, message: Message): CompletableFuture[Void]
def sendMessage(deviceId: String, message: Message)
def close()
}

Просмотреть файл

@ -8,7 +8,7 @@ import java.util.function._
import com.microsoft.azure.iot.kafka.connect.JsonSerialization
import com.microsoft.azure.iot.kafka.connect.sink.testhelpers.{SinkTestConfig, TestIotHubSinkTask, TestSinkRecords}
import com.microsoft.azure.iot.service.sdk.DeliveryAcknowledgement
import com.microsoft.azure.sdk.iot.service.sdk.DeliveryAcknowledgement
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
@ -42,18 +42,10 @@ class IotHubSinkTaskTest extends FlatSpec with GivenWhenThen with JsonSerializat
Then("It sends the records to the right destination")
Thread.sleep(1000)
var sentRecords = iotHubSinkTask.getSentMessages()
val sentRecords = iotHubSinkTask.getSentMessages()
When("Not all records have been sent and flush is called")
assert(sentRecords.size < sinkRecords.size())
iotHubSinkTask.flush(null)
Then("All records are sent before returning, and all messages contain all the right information")
sentRecords = iotHubSinkTask.getSentMessages()
assert(sentRecords.size == sinkRecords.size())
val sentRecordsIterator = sentRecords.iterator()
while (sentRecordsIterator.hasNext) {
val sentRecord = sentRecordsIterator.next()
for (sentRecord sentRecords) {
val predicate = (r: SinkRecord) r.value().asInstanceOf[Struct].getString("messageId") == sentRecord.getMessageId
val sinkRecord = sinkRecords.stream().filter(predicate).findAny()
assert(sinkRecord != null && sinkRecord.isPresent)
@ -74,18 +66,10 @@ class IotHubSinkTaskTest extends FlatSpec with GivenWhenThen with JsonSerializat
Then("It sends the records to the right destination")
Thread.sleep(1000)
var sentRecords = iotHubSinkTask.getSentMessages()
val sentRecords = iotHubSinkTask.getSentMessages()
When("Not all records have been sent and flush is called")
assert(sentRecords.size < sinkRecords.size())
iotHubSinkTask.flush(null)
Then("All records are sent before returning, and all messages contain all the right information")
sentRecords = iotHubSinkTask.getSentMessages()
assert(sentRecords.size == sinkRecords.size())
val sentRecordsIterator = sentRecords.iterator()
while (sentRecordsIterator.hasNext) {
val sentRecord = sentRecordsIterator.next()
for (sentRecord sentRecords) {
val predicate = (r: SinkRecord) r.value().asInstanceOf[String].contains(sentRecord.getMessageId)
val sinkRecord = sinkRecords.stream().filter(predicate).findAny()
assert(sinkRecord != null && sinkRecord.isPresent)

Просмотреть файл

@ -4,26 +4,17 @@
package com.microsoft.azure.iot.kafka.connect.sink.testhelpers
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, ExecutorService, Executors}
import com.microsoft.azure.iot.kafka.connect.sink.MessageSender
import com.microsoft.azure.iot.service.sdk.Message
import com.microsoft.azure.sdk.iot.service.sdk.Message
import scala.collection.mutable.ArrayBuffer
class MockMessageSender(connectionString: String) extends MessageSender {
private val executor: ExecutorService = Executors.newFixedThreadPool(5)
private var messageList: Option[ConcurrentLinkedQueue[Message]] = Some(new ConcurrentLinkedQueue[Message])
private var messageList: Option[ArrayBuffer[Message]] = Some(ArrayBuffer.empty[Message])
override def sendMessage(deviceId: String, message: Message): CompletableFuture[Void] = {
val messageListValue = messageList.get
val future = CompletableFuture.runAsync(new Runnable {
override def run(): Unit = {
Thread.sleep(2000)
messageListValue.add(message)
}
}, executor)
future
override def sendMessage(deviceId: String, message: Message): Unit = {
messageList.get += message
}
override def close(): Unit = {
@ -31,5 +22,5 @@ class MockMessageSender(connectionString: String) extends MessageSender {
messageList = None
}
def getSentMessages(): util.Collection[Message] = messageList.get
def getSentMessages(): ArrayBuffer[Message] = messageList.get
}

Просмотреть файл

@ -7,7 +7,7 @@ package com.microsoft.azure.iot.kafka.connect.sink.testhelpers
import java.util
import com.microsoft.azure.iot.kafka.connect.sink.IotHubSinkConfig
import com.microsoft.azure.iot.service.sdk.DeliveryAcknowledgement
import com.microsoft.azure.sdk.iot.service.sdk.DeliveryAcknowledgement
import com.typesafe.config.ConfigFactory
object SinkTestConfig {

Просмотреть файл

@ -4,14 +4,14 @@
package com.microsoft.azure.iot.kafka.connect.sink.testhelpers
import java.util
import com.microsoft.azure.iot.kafka.connect.sink.{IotHubSinkTask, MessageSender}
import com.microsoft.azure.iot.service.sdk.{DeliveryAcknowledgement, Message}
import com.microsoft.azure.sdk.iot.service.sdk.{DeliveryAcknowledgement, Message}
import scala.collection.mutable.ArrayBuffer
class TestIotHubSinkTask extends IotHubSinkTask {
def getSentMessages(): util.Collection[Message] = this.messageSender.get.asInstanceOf[MockMessageSender].getSentMessages()
def getSentMessages(): ArrayBuffer[Message] = this.messageSender.get.asInstanceOf[MockMessageSender].getSentMessages()
def getDeliveryAcknowledgement(): DeliveryAcknowledgement = this.acknowledgement