dcd2659f2c | ||
---|---|---|
docs | ||
project | ||
src/main | ||
.gitignore | ||
LICENSE | ||
README.md | ||
build.sbt | ||
docker-compose.yaml | ||
docker_compose_config_demo.conf | ||
dockerize.sh | ||
setup-env-vars.bat | ||
setup-env-vars.ps1 | ||
setup-env-vars.sh |
README.md
Azure IoTHub to Cassandra connector
Azure IoTHub to Cassandra is a connector that allows to transfer data in real time, from Azure IoT Hub to Apache Cassandra. Every message sent from a device connected to Azure IoT Hub, is automatically copied into one or more Cassandra tables.
Azure IoT Hub is a service used to connect thousands to millions of devices to the Azure cloud. Each device can send telemetry and receive messages and commands from the cloud.
Apache Cassandra is a highly scalable, fault tolerant, open source database, providing high availability and replication across datacenters. The database is organized in keyspaces and tables.
IoTHub to Cassandra uses a reactive stream, with asynchronous back pressure to transfer IoT messages. Messages can be filtered and mapped to multiple keyspaces and tables. New tables can be added through the configuration, or at runtime using the included web service.
How to test the connector in 5 minutes
- Install Docker
- Create an Azure IoT Hub
- Download docker-compose.yaml
- Download and edit setup-env-vars.sh (or setup-env-vars.bat on Windows) storing your hub connection settings
- Run
setup-env-vars.sh
(orsetup-env-vars.bat
on Windows) - Run
docker-compose up
The last command will download the Docker image publicly available [on Docker Hub] (https://hub.docker.com/r/toketi/iothub-to-cassandra) and start the connector, logging events on the console.
You should have now 2 containers running. Cassandra tables can be queried on port 9042, and IoTHub2Cassandra has a web service listening on port 9000.
Optionally:
- Simulate messages sent to your Azure IoT Hub
- See the messages flowing to Cassandra:
docker exec -it toketiiothubtocassandra_cassandra_1 cqlsh -C -e \
'SELECT COUNT(*) FROM mykeyspace.full_log'
To change the configuration used by Docker Compose, you can edit docker_compose_config_demo.conf, which is referenced in docker-compose.yaml.
Running the connector
Before running the service, make sure to prepare
- an Azure IoT Hub from which to read the data. We recommend to store the settings in environment variables, editing and executing setup-env-vars.sh (or setup-env-vars.bat on Windows).
- a configuration file with the information to connect to Azure IoT Hub and Cassandra (the configuration file included in the repository references the environment variables set in the previous step).
The following Azure IoT Hub documentation will help creating the hub and find the connection settings:
Running the service as a Docker container
For a quick test we suggest to use Docker Compose, which takes care of starting an instance of Apache Cassandra. Just download docker-compose.yaml to your workstation and (from the same folder) run:
docker-compose up
This will automatically
- start Cassandra
- start IoT Hub to Cassandra service
- connect to your Azure IoT hub
- start streaming messages (depending on the configured tables)
If you prefer storing events in a different Cassandra instance, you can start the Docker image publicly available on Docker Hub, running the following command. Note the need for a configuration file, you can use the one in the repository (see the docs folder for examples).
docker run -it -v $(pwd)/docs:/config -e CONFIG_FILE=/config/application_config_sample.conf \
-e IOTHUB_EVENTHUB_NAME=$IOTHUB_EVENTHUB_NAME -e IOTHUB_EVENTHUB_ENDPOINT=$IOTHUB_EVENTHUB_ENDPOINT \
-e IOTHUB_EVENTHUB_PARTITIONS=$IOTHUB_EVENTHUB_PARTITIONS -e IOTHUB_ACCESS_POLICY=$IOTHUB_ACCESS_POLICY \
-e IOTHUB_ACCESS_KEY=$IOTHUB_ACCESS_KEY \
-p 9000:9000 toketi/iothub-to-cassandra
Configuration
The following is a minimal configuration file example. Note how values can be replaced with the
${?XYZ}
syntax, which allows to retrieve values from the environment (e.g. to avoid visible passwords).
iothub {
hubName = my-iothub-one
hubEndpoint = sb://iothub-ns-myioth-75186-9fb862f912.servicebus.windows.net/
hubPartitions = 4
accessPolicy = service
accessKey = ${?IOTHUB_ACCESS_KEY}
accessHostName = my-iothub-one.azure-devices.net
}
cassandra {
cluster = "localhost"
}
http {
interface = "0.0.0.0"
port = 9000
}
iothub2cassandra {
configTable: "tables"
configKeyspace: "azureiothub2cassandra"
}
A more interesting configuration is available here, with other settings and comments describing their use case. Here's a comprehensive list:
Setting | Description | Example |
---|---|---|
akka.loglevel | Logging level | DEBUG |
cassandra.cluster | Cluster to connect | 192.168.0.10 |
iothub2cassandra.http.interface | Web service IP | 0.0.0.0 |
iothub2cassandra.http.port | Web service port | 9000 |
iothub2cassandra.configTable | Table where to store the list of tables to populate | tables |
iothub2cassandra.configKeyspace | Keyspace where to store the service configuration | azureiothub2cassandra |
iothub2cassandra.tables | Optional list of tables to populate | array, see this sample |
iothub.hubPartitions | Number of partition in your Azure IoT Hub | 4 |
iothub.hubName | Event Hub-compatible name | my-iothub-one |
iothub.hubEndpoint | Event Hub-compatible endpoint | sb://iothub-ns-myioth-75186-9fb862f912.servicebus.windows.net/ |
iothub.accessPolicy | Access policy to use | service |
iothub.accessKey | Access policy secret key | 6XdRSFB9H61f+N3uOdBJiKwzeqbZUj1K//T2jFyewN4= |
iothub.accessHostName | Access policy HostName | my-iothub-one.azure-devices.net |
IoT Hub to Cassandra is built upon the Azure IoTHubReact
library, which have some extra configuration settings. For a complete description, please see
IoT Hub React reference file
included in its repository.
Setting | Description |
---|---|
iothub.* | Connection details to connect to Azure IoT Hub |
iothub-stream.* | Streaming details, e.g. batch size and timeout |
iothub-checkpointing.* | Checkpointing details, i.e. how to keep the current stream position in case of restart |
Defining the Cassandra tables
IoT Hub to Cassandra allows to stream telemetry to different tables, with different schemas, filtering data, and extracting data from telemetry (in JSON format).
Tables can be added in two different ways:
- through the configuration file
- through the included web service
A table specification allows to:
- Specify where to write the data
- The schema of each table, i.e. the column name, the clustering and partitioning keys
- How to map values from the incoming telemetry to the table columns
- Optional filters, e.g. to filter messages by message type and/or device ID
Tables added in the configuration file
Multiple tables can be added in the configuration file. These tables will be created as soon as the service starts, and the service will immediately start populating them, reading the telemetry stream from Azure IoT Hub.
Here's an example of two distinct tables. The first will contain a copy of all the incoming messages. The second table instead, will contain only "temperature" messages, extracting the temperature value.
Note: in the second table, the source payload must be in JSON format, and the message type must be
set by the device (adding a "$$messageType"
custom property).
...
tables: [
{
"table": "full_log",
"keyspace": "mykeyspace",
"columns": [
{"name": "messageType", "type": "text", "source": "MessageType", "key": "Partition"},
{"name": "id", "type": "text", "source": "MessageId", "key": "Partition"},
{"name": "time", "type": "timestamp", "source": "Received", "key": "Clustering"},
{"name": "device", "type": "text", "source": "DeviceId", "key": "Clustering"},
{"name": "content", "type": "text", "source": "Content"}
]
},
{
"table": "livingRoomTemperature",
"keyspace": "mykeyspace",
"contentType": "json",
"filters": {
"messageType": "temperature",
"deviceId": "livingRoom"
},
"columns": [
{"name": "time", "type": "timestamp", "source": "Received", "key": "Partition"},
{"name": "device", "type": "text", "source": "DeviceId", "key": "Partition"},
{"name": "value", "type": "double", "source": "Content", "sourcePath": "value"}
]
}
]
...
In the docs folder you can find multiple examples, showing how to define partition keys, clustering keys, columns types, filters, and how to extract values from the messages payload.
Adding tables using the web service
The list of tables, and how to write the incoming telemetry into these tables, is stored in a configuration table in Cassandra, so you can for example restart and re-deploy the service without losing this information.
Adding new tables through the configuration file requires to deploy and restart the service, which is not always possible or desired. IoTHub to Cassandra includes a web service that allows to add new table configurations at runtime, without the need to restart.
The syntax used by the web service is equivalent to the one used in the configuration file seen above. In fact, you can copy and paste a table definition, from the configuration file to the webservice, and viceversa.
Here's an example of how to add the same full_log
table seen above, using the web service:
Add full_log
table
Web service request:
POST http://<host>:9000/api/tables
Content-Type: application/json
{
"table": "full_log",
"keyspace": "mykeyspace",
"columns": [
{"name": "messageType", "type": "text", "source": "MessageType", "key": "Partition"},
{"name": "id", "type": "text", "source": "MessageId", "key": "Partition"},
{"name": "time", "type": "timestamp", "source": "Received", "key": "Clustering"},
{"name": "device", "type": "text", "source": "DeviceId", "key": "Clustering"},
{"name": "content", "type": "text", "source": "Content"}
]
}
Response:
200 OK
Other web service methods
The web service provides also methods to list the configured tables, to start/stop/restart the streaming, and to check the service health status.
Get a list of all the tables
GET http://<host>:9000/api/tables
Start/Stop/Restart the streaming
POST http://<host>:9000/api/streaming/start
POST http://<host>:9000/api/streaming/stop
POST http://<host>:9000/api/streaming/restart
Check service status
GET http://<host>:9000/status
Contribute Code
If you want/plan to contribute, we ask you to sign a CLA (Contribution License Agreement). A friendly bot will remind you about it when you submit a pull-request.
If you are sending a pull request, we kindly request to check the code style with IntelliJ IDEA,
importing the settings from
Codestyle.IntelliJ.xml
.