25 KiB
layout | title | author | author-link | date | categories | color | image | excerpt | language | verticals | geolocation | ||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
post | Building an IoT solution for Chongqing Gas Group Corporation with JCBLE | Michael Li | # | 2017-02-11 |
|
blue | images/2016-12-22-JCBLE/feat_team-2.jpg | JCBLE, in cooperation with Microsoft, developed an IoT solution that reduces operating and maintenance costs for civilian natural-gas usage. |
|
|
|
Chongqing Gas Group Corporation wants to deploy next-generation gas meters to millions of family housing units to help monitor gas-meter status and gas usage automatically.
JCBLE, a startup established in 2014, is a vendor of Chongqing Gas. JCBLE offers Internet of Things (IoT) solutions; their key technology is long-distance wireless data transfer.
Normally, the signal from a wireless access point (AP) covers one building floor. But APs using JCBLE technologies can cover more than three building floors and a wider area. That's the reason Chongqing Gas selected JCBLE as their partner.
The pain point for JCBLE is that they are skilled in device and wireless data transfer but not in server-side technologies. Before they met Microsoft, they had only one service, named "NS Service," running on a single Ubuntu Linux computer to receive and decrypt data from sensor devices. They needed a way to scale up device connections and add a message store with high availablity. Microsoft Azure offers the server-side capabilities that JCBLE needs, including IoT Hub, Stream Analytics, SQL Database, and Blob storage.
JCBLE is using a customized version of Linux on their device that they call simply "Gateway." The Gateway is a transparent device. Any endpoint device with sensors issued by JCBLE can connect to any Gateway device without configuration. So the JCBLE wireless solution is very easy to deploy.
For security, data transfer between sensor devices, Gateway devices, and NS Service is encrypted by JCBLE. All devices are logged in the JCBLE database, and NS Service issues a security token to each device. (The encryption methods are confidential.)
The team included the following:
- Shiping Li (李仕平) – CTO, JCBLE
- Yao Zhang (张尧) – Senior Developer, JCBLE
- Michael Li – Technical Evangelist, Microsoft
- Zepeng She – Technical Evangelist, Microsoft
- David Yan – Technical Evangelist, Microsoft
![picture of team working]({{ site.baseurl }}/images/2016-12-22-JCBLE/team-1.jpg)
![picture of team working]({{ site.baseurl }}/images/2016-12-22-JCBLE/team-2.jpg)
Customer profile
Chongqing JCBLE Network Technology Co., Ltd. was founded in May 2014. The company is mainly engaged in wireless IoT with augmented reality (AR) technology in both software and hardware design, as well as research and development. Based on their core technology, they provide AR-based solutions related to building smart cities, intelligent scenic spots, intelligent businesses, and so on.
Problem statement
In the past ten years, a thousand employees of Chongqing Gas would periodically read gas meters. To save operating costs, Chongqing Gas decided to deploy a new generation of gas meters that include IoT functionality.
After the gas meters are deployed, employees won't need to knock on doors to check the meter readings. A few employees in the office will be able to monitor the meters and analyize gas-usage trends, saving ¥30 million in operating costs each year.
By using the JCBLE wireless solution, Chongqing Gas doesn't need to deploy cable to each housing unit. A few JCBLE Gateway devices around the building would save hundreds of millions of yuan for Chongqing Gas. The challenge is that, on the server side, JCBLE can offer only one Linux server for collecting data, with no further data-processing ability.
Per the requirements of Chongqing Gas, each gas meter sends one data packet every 30 seconds. The size of each data packet is less than 1 KB. In a scenario with one million devices, the server side must process 2,880,000,000 data packets per day. A single Linux server simply can't the load. JCBLE needs a way to collect the data from millions of gas meters.
Another challenge is that JCBLE doesn't have the ability to analyize real-time data and historical data. When a malfunctioning gas meter sends abnormal data, JCBLE needs to generate an alert to notify Chongqing Gas employees. And Chongqing Gas needs trend graphs so that they can adjust their natural-gas pressure.
“We have to find an easy way to collect gas-meter data from millions of gas meters. JCBLE is a startup company; we cannot afford payment for a lot of engineers on the server side.” —Shiping Li, CTO, JCBLE
“In cooperation with Microsoft, we want to enable Azure to help us resolve the problems that we are facing.” —Chili Meng, CEO, JCBLE
Communication between the JCBLE Linux server and Gateway devices is double encrypted (AMQP over TLS and JCBLE encryption) and uses a private protocol. The challenge of integrating IoT Hub with the JCBLE on-premises solution is decrypting data sent from the Gateway on the server side. After the data is decrypted, it can be processed by Stream Analytics and other Azure services.
Solution and steps
JCBLE and Microsoft teamed up to create a proof of concept that would simulate the key measurements to be collected from a Gateway device, which will be located on an apartment building. The data is sent from the sensors through the Gateway to an Azure IoT hub, where the data is decrypted by a JCBLE service running on Linux. The decrypted data is saved to Azure Event Hubs and handled by Stream Analytics, where alerts are generated for abnormal events and aggregated data is stored for insight into the overall status of the housing units.
The solution was developed over the course of a few days at a hackfest held at the JCBLE office in Chongqing. The Microsoft DX China team worked with JCBLE on the following tasks:
- Knowledge transfer about Azure deployment (Azure Resource Manager, Storage, Virtual Machines, IoT Hub, Stream Analytics, and so on)
- Add IoT Hub to the front end to provide real-time data transmission from device to cloud
- Create an application, named IoTConnector, that retrieves data and sends it to NS Service for data decryption
- Deploy NS Service behind Azure Internal Load Balancer for best performance
- Use Stream Analytics to analyze real-time data, generate alerts, and put data into various locations
“It's amazing that Azure can really help us solve the problems we are facing. The Microsoft team is working with us, coding with us, and coaching us how to set up Azure services. It's really helpful.” —Shiping Li
The overall architecture is as follows:
![architecture diagram]({{ site.baseurl }}/images/2016-12-22-JCBLE/JCBLE-IoT-Architecture.png)
The gas meter device sends two packets of encrypted data every minute to IoT Hub through the Gateway device. When a data packet arrives, IoTConnector retrieves it and sends it to NS Service for decryption. After the data is decrypted, IoTConnector saves the data to Event Hubs. Then a Stream Analytics job processes the data. The data includes a device ID that is used by the rest of the system to determine the tenant to whom the device belongs.
The Gateway devices communicates with IoT Hub using the AMQP protocol over a TLS connection. Each device has a unique access token, that is scoped to allow the device to send data to IoT Hub but not to allow any management of the device registry.
The data from the gas meter depends on NS Service for decryption. For best performance, the NS Service application has been changed to a stateless model and deployed behind Azure Internal Load Balancer. IoTConnector can also be deployed as multiple instances. It can retrieve data from one or more specific IoT Hub queues and then send encrypted data to NS Service for data decryption.
The Stream Analytics job has two queries: a raw data query and an alert query. All data is placed in Blob storage as historical data for HDInsight analysis in future. Some alert data will be save to SQL Database.
Technical delivery
IoTConnector connects to queues in IoT Hub and retrieves encrypted data as it comes from the gas meters. To retrieve data, we used Event Hub SDK, which also establishes TCP/UDP endpoint connections as shown in the following code. (In the old architecture, NS Service was designed to receive data from an UDP port for efficiency. JCBLE doesn't want to change NS Service a lot, so IoTConnector uses a UDP port for compatibility.)
public class IoTHubReceiver : IDisposable
{
#region --- Fields ---
private EventHubClient _eventHubClient = null;
private List<EventHubReceiver> _eventHubReceiverList = null;
private int _eventHubPartitionsCount = 0;
private UDPConnector _UDPConnector = null;
private StorageConnector _storageConnector = null;
private ILog _infoLogger = null;
private ILog _errorLogger = null;
#endregion
#region --- Constructors ---
public IoTHubReceiver()
{
try
{
_infoLogger = LogManager.GetLogger(this.GetType());
_errorLogger = LogManager.GetLogger(this.GetType());
_storageConnector = new StorageConnector(SettingsManager.StorageConnectionString);
_infoLogger.Info("Initialize IoTHubReceiver.");
_UDPConnector = new UDPConnector(SettingsManager.NSServer,
SettingsManager.UDPPort, SettingsManager.LocalUDPPort);
_infoLogger.InfoFormat("Initialize IoTHubReceiver.");
_UDPConnector.UDPDataReceived += UDPConnector_UDPDataReceived;
_infoLogger.InfoFormat("Attached event handler to UDPConnector.");
_UDPConnector.Open();
_infoLogger.InfoFormat("UDPConnector Initialized.");
}
catch (Exception ex)
{
_errorLogger.ErrorFormat("IoTHubReceiver constructor got error:{0}",
ex.ToString().Replace("\r\n", " "));
}
}
#endregion
#region --- Methods ---
// Retrieve data from each IoT Hub partition
public async void ReceiveEventDataAsync(DateTime startTime)
{
_infoLogger.Info("Starting retrieve data from IoT Hub......");
_eventHubReceiverList = new List<EventHubReceiver>();
_eventHubClient = EventHubClient.CreateFromConnectionString(SettingsManager.EventHubConnectionString);
_infoLogger.Info("Created EventHubClient from IoT Hub connection string.");
_eventHubPartitionsCount = _eventHubClient.GetRuntimeInformation().PartitionCount;
string[] partitionIds = _eventHubClient.GetRuntimeInformation().PartitionIds;
_infoLogger.Info("Get EventHubClient partition informations.");
try
{
// Create Event Hub receivers by partition ID
foreach (string pId in partitionIds)
{
if (-1 < _eventHubClient.GetPartitionRuntimeInformation(pId).LastEnqueuedSequenceNumber)
{
// Receive data from startTime
_eventHubReceiverList.Add(_eventHubClient.GetDefaultConsumerGroup().CreateReceiver(pId, startTime));
_infoLogger.InfoFormat("{0}: Partition {1} has queued data. Getting ready to retrieve data.", startTime, pId);
}
}
// Retrieve event data from partitions
_infoLogger.InfoFormat("Begin retrieve historical data from IoT Hub.");
List<EventData> dataList = new List<EventData>();
// Retrieve historical data
foreach (EventHubReceiver recv in _eventHubReceiverList)
{
dataList.AddRange(
await recv.ReceiveAsync(int.MaxValue - 1, TimeSpan.FromSeconds(20)));
_infoLogger.InfoFormat("Collected historical data from EventHubReceiver {0}.", recv.Name);
}
long historicalCount = 0;
foreach (EventData data in dataList)
{
// Send event data to NS Service UDP port for decryption
var enqueuedTime = data.EnqueuedTimeUtc.ToLocalTime();
_infoLogger.InfoFormat("Time: {0}\t Data Received: {1}",
enqueuedTime.ToString(), Convert.ToBase64String(data.GetBytes()));
await _UDPConnector.SendAsync(data.GetBytes());
historicalCount++;
}
_infoLogger.InfoFormat("Totally sent {0} messages to UDP port.", historicalCount);
_infoLogger.InfoFormat("End retrieve historical data from IoT Hub.");
List<EventData> currentDataList = null;
long currenctCount = 0;
_infoLogger.InfoFormat("Begin retrieve current data from IoT Hub.");
while (true)
{
currentDataList = new List<EventData>();
foreach (EventHubReceiver recv in _eventHubReceiverList)
{
currentDataList.AddRange(await recv.ReceiveAsync(int.MaxValue, TimeSpan.FromSeconds(1)));
_infoLogger.InfoFormat("Collected current data from EventHubReceiver {0}.", recv.Name);
}
foreach (EventData data in currentDataList)
{
// Send event data to NS Service UDP port for decryption
var enqueuedTime = data.EnqueuedTimeUtc.ToLocalTime();
await _UDPConnector.SendAsync(data.GetBytes());
currenctCount++;
}
_infoLogger.InfoFormat("This loop has sent {0} messages to UDP port.", currenctCount);
// clear all.
currenctCount = 0;
currentDataList = null;
}
}
catch (Exception ex)
{
_errorLogger.ErrorFormat("ReceiveEventDataAsync got an error when retrieve data from IoT Hub:{0}",
ex.ToString().Replace("\r\n", " "));
try
{
_infoLogger.InfoFormat("Now reconnecting IoT Hub.");
// If IoT Hub disconnected then reconnect
await CloseAsync();
ReceiveEventDataAsync(DateTime.Now);
}
catch
{
}
}
_infoLogger.InfoFormat("End retrieve current data from IoT Hub.");
}
public async Task CloseAsync()
{
try
{
// Close EventHubReceiver
foreach (EventHubReceiver recv in _eventHubReceiverList)
{
await recv.CloseAsync();
}
_eventHubReceiverList.Clear();
_infoLogger.InfoFormat("EventHubReceiver has been closed.");
// Close EventHubClient
await _eventHubClient.CloseAsync();
_infoLogger.InfoFormat("EventHubClient has been closed.");
// Close UDP socket
// _UDPConnector.Close();
_infoLogger.InfoFormat("UDPConnector has been closed.");
}
catch (Exception ex)
{
_errorLogger.ErrorFormat("IoTHubReceiver closing got an error: {0}",
ex.ToString().Replace("\r\n", " "));
}
}
#endregion
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected async virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
await this.CloseAsync();
}
disposedValue = true;
}
}
// This code added to correctly implement the disposable pattern.
void IDisposable.Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
When the encrypted data has been decrypted, it is sent back from NS Service and written to Event Hub by IoTConnector:
private void OnTCPDataReceived(IAsyncResult ar)
{
try
{
_infoLogger.Info("TCP port establish connection with NS server.");
_tcpClient = _tcpListener.EndAcceptTcpClient(ar);
byte[] buffer = new byte[2048];
NetworkStream ns = _tcpClient.GetStream();
string json = "";
while (_isRunnning)
{
// reset buffer.
for (int i = 0; i < 2048; i++)
buffer[i] = 0;
ns.ReadAsync(buffer, 0, 2048).Wait();
int flag = 0;
for (int i = 0; i < 2048; i++)
{
if(buffer[i]!=0)
{
flag = 1;
break;
}
}
if (flag > 0)
{
_infoLogger.InfoFormat("TCP port received data : {0}", Convert.ToBase64String(buffer));
List<byte> lbuffer = new List<byte>();
for (int i = 0; i < 2048; i++)
{
if(buffer[i] != 0)
{
lbuffer.Add(buffer[i]);
}
}
byte[] buff = new byte[lbuffer.Count];
for (int i = 0; i < lbuffer.Count; i++)
{
buff[i] = lbuffer[i];
}
// buffer => MeterDataEntry
json = Encoding.Default.GetString(buff);
MeterDataEntry meterData = new MeterDataEntry();
meterData = JsonConvert.DeserializeObject<MeterDataEntry>(json);
FinalDataEntry finalData = new FinalDataEntry();
finalData.originData = meterData;
if (meterData != null && meterData.app != null)
{
string dtime = meterData.app.gwrx[0].time;
string pl = meterData.app.userdata.payload;
int tail = 4 - (pl.Length) % 4;
if (tail < 4)
{
for (int k = 0; k < tail; k++)
pl += "=";
}
byte[] payload = Convert.FromBase64String(pl);
Payload payloadData = new Payload(
(int)payload[0],
(int)payload[1],
BitConverter.ToInt16(payload.Skip(2).Take(4).ToArray(), 0) / 1000.0f,
BitConverter.ToInt16(payload.Skip(6).Take(4).ToArray(), 0) / 1000.0f,
BitConverter.ToInt16(payload.Skip(10).Take(4).ToArray(), 0) / 1000.0f,
BitConverter.ToInt16(payload.Skip(14).Take(2).ToArray(), 0),
Encoding.ASCII.GetString(payload.Skip(16).Take(4).ToArray()),
dtime);
finalData.userData = payloadData;
string finalData_json = JsonConvert.SerializeObject(finalData);
_eventhubSender.SendAsync(Encoding.UTF8.GetBytes(finalData_json)).Wait();
_infoLogger.InfoFormat("Buffer data : {0} has been written to Event Hub.", Convert.ToBase64String(buffer));
}
}
Thread.Sleep(50);
}
}
catch (Exception ex)
{
_errorLogger.ErrorFormat("Handling TCP received data encounter error: {0}",
ex.ToString().Replace("\r\n", " "));
// Reconnect
try
{
Close();
Open();
}
catch
{
}
}
}
The decrypted data from NS Service looks like this:
{
"originData": {
"gw": null,
"mote": null,
"app": {
"moteeui": "ffffffffffffff01",
"dir": "up",
"userdata": {
"seqno": 0,
"port": 1,
"payload": "AgEAD0JAAA7w4wAAUV0ABHYxLjA"
},
"motetx": {
"freq": 481.5,
"mode": null,
"datr": "SF12BW125",
"codr": "4/5",
"adr": false
},
"gwrx": [{
"eui": "fffe020100000011",
"time": "2016-12-21T03:24:34Z",
"timefromgateway": false,
"chan": 4,
"rfch": 0,
"rssi": -13,
"lsnr": 9
}]
}
},
"userData": {
"type": 2,
"code": 1,
"bought": 3.84,
"left": 3.584,
"used": 0.0,
"status": 1024,
"version": "v1.0",
"time": "2016-12-21T03:24:34Z"
}
}
When decrypted data arrives in Event Hub, it goes into Stream Analytics. To save raw data to Blob storage, we use the following SQL statements:
SELECT
originData.app.moteeui AS [DeviceID],
userData.[time] AS [DateTime],
userData.[type] AS [MeterType],
userData.[code] AS [FactoryCode],
userData.[bought] AS [Bought],
userData.[left] AS [Remaining],
userData.[used] AS [TotalUsed],
userData.[status] AS [MeterStatus],
userData.[version] AS [MeterSoftwareVer]
INTO BlobOutput
FROM EventHubInput
To generate alerts, we focus on gas meters that read less than 5 cubic meters, as shown here:
SELECT
originData.app.moteeui AS [DeviceID],
userData.[time] AS [DateTime],
userData.[type] AS [MeterType],
userData.[code] AS [FactoryCode],
userData.[bought] AS [Bought],
userData.[left] AS [Remaining],
userData.[used] AS [TotalUsed],
userData.[status] AS [MeterStatus],
userData.[version] AS [MeterSoftwareVer]
INTO SQLOutput
FROM EventHubInput
WHERE Remaining < 5.0
Conclusion
The proof of concept is now complete and deployed into the JCBLE Azure subscription. JCBLE is now working with Chongqing Gas to build a small physical network to verify the whole solution. Learnings from the pilot will be integrated into other JCBLE solutions that will become available for other JCBLE customers.
“It's happy to work with Microsoft. In these days we learned a lot of knowledge about Azure services. Azure really helped us resolve the problems we are facing. IoT Hub can help us scale up the ability of device connecting, and Stream Analytics can analyze real-time data without development.” —Shiping Li
Additional resources
-
To cross-compile the IoT Hub SDK on their Gateway devices, JCBLE referenced Cross Compiling the Azure IoT Hub SDK.
-
For interoperability with Azure Event Hubs, we referenced Get started sending messages to Event Hubs
-
For Stream Analytics query syntax, see Stream Analytics Query Language Reference
-
For more information on JCBLE technologies, see this news release: JCBLE innovative JuRa protocol for wireless transmission