Update README for Sink and handle wait timeout
- Updated README for Sink cover cases when a SchemaRegistry is being used and when it is not being used. - Also handled the case when it takes too long for the Flush to wait for the send tasks to complete.
This commit is contained in:
Родитель
eac0895cae
Коммит
67a70e6fb3
|
@ -94,6 +94,11 @@ these types in Kafka topics.
|
|||
|
||||
### Building and running
|
||||
|
||||
Kafka Connect is a generic tool to copy data between Kafka and other systems (like Azure IoT Hub). To copy data from
|
||||
Azure IoT Hub to Kafka, you need to make the code from this repository available to Kafka Connect and configure it to
|
||||
use the right Azure IoT Hub and Kafka settings. For more details on using Kafka Connect, you can refer to the
|
||||
[Kafka Connect user guide](http://docs.confluent.io/3.0.0/connect/userguide.html).
|
||||
|
||||
#### Pre-requisites
|
||||
|
||||
* You need to have an Azure IoT Hub and a set of devices to which you want to send messages. Get started
|
||||
|
@ -102,11 +107,51 @@ these types in Kafka topics.
|
|||
* You also need to have Apache Kafka v0.10 installation running, that contains messages to be sent to the IoT devices in
|
||||
one or more topics. Get started with Kafka [here](http://docs.confluent.io/3.0.0/quickstart.html).
|
||||
|
||||
* To insert the messages for the IoT devices in the Kafka topic, you can use a KafkaProducer. You can read more on
|
||||
how to use the KafkaProducer [here](http://docs.confluent.io/3.0.0/clients/producer.html).
|
||||
The steps to insert messages in Kafka topics and to run the Sink connector depend on whether you are using the
|
||||
[Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs/) along with the Confluent platform. So please
|
||||
follow the right set of steps below depending on your choice.
|
||||
|
||||
If you are using a Schema Registry (recommended), then the messages can be inserted in Kafka as Avro records. Here is
|
||||
the sample code to send such messages to a Kafka topic in the right format.
|
||||
#### Steps when using Schema Registry
|
||||
|
||||
When using [Schema Registry](http://docs.confluent.io/3.0.0/schema-registry/docs/) along with the
|
||||
[Confluent Platform](http://docs.confluent.io/3.0.0/platform.html), messages are inserted in Kafka topic as Avro records
|
||||
which include the schema of the messages.
|
||||
|
||||
1. Build the source code after cloning this repository using the following command, which will generate a jar file with
|
||||
all the dependencies embedded in it.
|
||||
```
|
||||
sbt assembly
|
||||
```
|
||||
Alternatively, you can directly download the jar file for Kafka Connect IoT Hub from
|
||||
[here](https://github.com/Azure/toketi-kafka-connect-iothub/releases/download/v0.6/kafka-connect-iothub-assembly_2.11-0.5.jar).
|
||||
|
||||
2. Binplace the jar file in the Kafka installation libs folder (usually under KAFKA_HOME/libs).
|
||||
|
||||
3. Update the config file "[connect-iothub-sink.properties](connect-iothub-sink.properties)" with the appropriate
|
||||
values as described in the section above. Binplace the file "connect-iothub-sink.properties" in the Kafka
|
||||
installation config folder (usually under KAFKA_HOME/etc).
|
||||
|
||||
4. Update the Kafka Connect configuration file ("etc/kafka/connect-standalone.properties" or
|
||||
"etc/kafka/connect-distributed.properties") to point to the Kafka bootstrap servers.
|
||||
|
||||
5. Make sure Kafka server, Zookeeper, and Schema Registry are running, as described
|
||||
[here](http://docs.confluent.io/3.0.0/quickstart.html)
|
||||
|
||||
6. Start Kafka source connector in
|
||||
[standalone mode](http://docs.confluent.io/3.0.0/connect/userguide.html#standalone-worker-configuration) to read
|
||||
messages from Kafka topic and send them to the IoT Devices -
|
||||
|
||||
```
|
||||
bin/connect-standalone.sh config/connect-standalone.properties config/connect-iothub-sink.properties
|
||||
```
|
||||
For distributed mode, the connector configuration will stay the same. For the detailed steps on how to do this, please
|
||||
follow the
|
||||
[Confluent Kafka Connect documentation](http://docs.confluent.io/3.0.0/connect/userguide.html#distributed-worker-configuration)
|
||||
on this topic.
|
||||
|
||||
7. Insert messages to be sent to the IoT devices in the Kafka topic as Avro records. One way you can do that is using a
|
||||
[KafkaProducer](http://docs.confluent.io/3.0.0/clients/producer.html). Here is some sample code to send such messages to
|
||||
a Kafka topic in the right format.
|
||||
|
||||
```java
|
||||
val props = new Properties()
|
||||
|
@ -136,14 +181,10 @@ the sample code to send such messages to a Kafka topic in the right format.
|
|||
producer.send(producerRecord)
|
||||
```
|
||||
|
||||
If you are not using a Schema Registry, then the messages can be inserted in Kafka as JSON serialized strings.
|
||||
#### Steps when not using Schema Registry
|
||||
|
||||
#### Steps
|
||||
|
||||
Kafka Connect is a generic tool to copy data between Kafka and other systems (like Azure IoT Hub). To copy data from
|
||||
Azure IoT Hub to Kafka, you need to make the code from this repository available to Kafka Connect and configure it to
|
||||
use the right Azure IoT Hub and Kafka settings. For more details on using Kafka Connect, you can refer to the
|
||||
[Kafka Connect user guide](http://docs.confluent.io/3.0.0/connect/userguide.html).
|
||||
If you are using the standard Apache Kakfa without the Schema Registry integration, messages are inserted in Kafka
|
||||
topics as JSON strings, which are then deserialized by the Sink connector.
|
||||
|
||||
1. Build the source code after cloning this repository using the following command, which will generate a jar file with
|
||||
all the dependencies embedded in it.
|
||||
|
@ -159,10 +200,13 @@ Alternatively, you can directly download the jar file for Kafka Connect IoT Hub
|
|||
values as described in the section above. Binplace the file "connect-iothub-sink.properties" in the Kafka
|
||||
installation config folder (usually under KAFKA_HOME/config).
|
||||
|
||||
4. Update the Kafka Connect configuration file ("config/connect-standalone.properties" or
|
||||
"config/connect-distributed.properties") to point to the Kafka bootstrap servers.
|
||||
4. Make the following updates to the Kafka Connect configuration file ("config/connect-standalone.properties" or
|
||||
"config/connect-distributed.properties") -
|
||||
* Update bootstrap.servers to point to the Kafka server.
|
||||
* Update key.converter and value.converter to use org.apache.kafka.connect.storage.StringConverter (instead of the
|
||||
default org.apache.kafka.connect.json.JsonConverter)
|
||||
|
||||
5. Make sure Kafka server, Zookeeper, and optionally Schema Registry are running, as described [here](http://docs.confluent.io/3.0.0/quickstart.html)
|
||||
5. Make sure Kafka server and Zookeeper are running, as described [here](https://kafka.apache.org/documentation#quickstart)
|
||||
|
||||
6. Start Kafka source connector in
|
||||
[standalone mode](http://docs.confluent.io/3.0.0/connect/userguide.html#standalone-worker-configuration) to read
|
||||
|
@ -176,6 +220,25 @@ follow the
|
|||
[Confluent Kafka Connect documentation](http://docs.confluent.io/3.0.0/connect/userguide.html#distributed-worker-configuration)
|
||||
on this topic.
|
||||
|
||||
7. Insert messages to be sent to the IoT devices in the Kafka topic as JSON strings. One way you can do that is using a
|
||||
[KafkaProducer](https://kafka.apache.org/documentation/#producerapi). Here is some sample code to send such messages to
|
||||
a Kafka topic in the right format.
|
||||
|
||||
```java
|
||||
val props = new Properties()
|
||||
props.put("bootstrap.servers", "localhost:9092")
|
||||
props.put("acks", "all")
|
||||
props.put("retries", "0")
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
|
||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
|
||||
|
||||
val producer = new KafkaProducer[String, String](this.props)
|
||||
|
||||
val message =
|
||||
"""{"messageId":"msg1","message":"Turn On","deviceId":"device1-123456", "expiryTime":"2017-01-19T19:25:50Z"}"""
|
||||
val producerRecord = new ProducerRecord[String, GenericRecord](topic, "device1-123456", message)
|
||||
producer.send(producerRecord)
|
||||
```
|
||||
|
||||
## Future work
|
||||
|
||||
|
|
|
@ -17,6 +17,9 @@ import org.apache.kafka.connect.sink.{SinkRecord, SinkTask}
|
|||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.concurrent.{Await, Future, TimeoutException}
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization {
|
||||
|
||||
|
@ -49,6 +52,7 @@ class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization {
|
|||
val c2DMessage = C2DMessageConverter.validateSchemaAndGetMessage(record)
|
||||
this.sendMessage(c2DMessage)
|
||||
}
|
||||
logger.info(s"Started tasks to send ${records.size()} messages to devices.")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -65,7 +69,7 @@ class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization {
|
|||
if (c2DMessage.expiryTime.isDefined) {
|
||||
message.setExpiryTimeUtc(c2DMessage.expiryTime.get)
|
||||
}
|
||||
logger.info(s"Sending Message to Device - $message")
|
||||
logger.debug(s"Sending Message to Device - $message")
|
||||
this.sendMessageFutures += this.messageSender.get.sendMessage(c2DMessage.deviceId, message)
|
||||
}
|
||||
|
||||
|
@ -74,9 +78,26 @@ class IotHubSinkTask extends SinkTask with LazyLogging with JsonSerialization {
|
|||
this.waitForAllMessages()
|
||||
}
|
||||
|
||||
// Wait for pending tasks to complete. If some of the tasks take too long, cancel the remaining tasks to avoid Kafka
|
||||
// Connect from timing out (default timeout is 30 seconds).
|
||||
private def waitForAllMessages(): Unit = {
|
||||
logger.info("Waiting for all send message tasks to complete")
|
||||
CompletableFuture.allOf(this.sendMessageFutures:_*).join()
|
||||
|
||||
try {
|
||||
val waitJob = Future {
|
||||
CompletableFuture.allOf(this.sendMessageFutures: _*).join()
|
||||
}
|
||||
Await.result(waitJob, 20.seconds)
|
||||
} catch {
|
||||
case tex: TimeoutException ⇒ {
|
||||
val completedFutures = this.sendMessageFutures.count(f ⇒ f.isDone)
|
||||
val pendingFutures = this.sendMessageFutures.size - completedFutures
|
||||
|
||||
logger.error("Got timeout exception while waiting for send message tasks. Ignoring the pending tasks to avoid" +
|
||||
" Kakfa Connect from timing out. " +
|
||||
s"There are $completedFutures completed tasks and $pendingFutures pending tasks.")
|
||||
}
|
||||
}
|
||||
this.sendMessageFutures = mutable.ArrayBuffer.empty[CompletableFuture[Void]]
|
||||
logger.info(s"Done waiting for all send message tasks")
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
package com.microsoft.azure.iot.kafka.connect.sink.testhelpers
|
||||
|
||||
|
||||
import java.util
|
||||
import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, ExecutorService, Executors}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче