зеркало из
1
0
Форкнуть 0

Refactoring the twin feature to enable retries: MQTT Transport

This commit is contained in:
Pierre Cauchois 2018-02-23 19:40:05 -08:00
Родитель 90e269ba71
Коммит d6aff5f476
12 изменённых файлов: 1020 добавлений и 1111 удалений

Просмотреть файл

@ -67,6 +67,8 @@ The `Mqtt` and `MqttWs` constructors initialize a new instance of the MQTT trans
**SRS_NODE_DEVICE_MQTT_18_025: [** If the `Mqtt` constructor receives a second parameter, it shall be used as a provider in place of mqtt.js **]**
**SRS_NODE_DEVICE_MQTT_16_081: [** The `Mqtt` constructor shall subscribe to the `MqttTwinClient` `twinDesiredPropertiesUpdates` and reemit them. **]**
### connect(done)
The `connect` method initializes a connection to an IoT hub.
@ -214,67 +216,25 @@ interface DeviceMethodResponse {
**SRS_NODE_DEVICE_MQTT_16_034: [** The `sendMethodResponse` method shall fail with a `NotConnectedError` if the `MqttBase` object is not connected. **]**
### sendTwinRequest(method, resource, properties, body, done)
### getTwin(done)
The `sendTwinRequest` method sends the given body to the given endpoint on an IoT hub on behalf of the device indicated in the constructor argument.
The `getTwin` method is used to retrieve the device twin.
**SRS_NODE_DEVICE_MQTT_18_001: [** The `sendTwinRequest` method shall call the publish method on `MqttBase`. **]**
**SRS_NODE_DEVICE_MQTT_16_075: [** `getTwin` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected. **]**
**SRS_NODE_DEVICE_MQTT_18_008: [** The `sendTwinRequest` method shall not throw if the `done` callback is falsy. **]**
**SRS_NODE_DEVICE_MQTT_16_076: [** `getTwin` shall call its callback with an error if it fails to connect the transport **]**
**SRS_NODE_DEVICE_MQTT_18_009: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `method` argument is falsy. **]**
**SRS_NODE_DEVICE_MQTT_16_077: [** `getTwin` shall call the `getTwin` method on the `MqttTwinClient` object and pass it its callback. **]**
**SRS_NODE_DEVICE_MQTT_18_010: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `method` argument is not a string. **]**
### updateTwinReportedProperties
**SRS_NODE_DEVICE_MQTT_18_019: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `resource` argument is falsy. **]**
The `updateTwinReportedProperties` method is used to retrieve the device twin.
**SRS_NODE_DEVICE_MQTT_18_020: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `resource` argument is not a string. **]**
**SRS_NODE_DEVICE_MQTT_16_078: [** `updateTwinReportedProperties` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected. **]**
**SRS_NODE_DEVICE_MQTT_18_011: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `properties` argument is falsy. **]**
**SRS_NODE_DEVICE_MQTT_16_079: [** `updateTwinReportedProperties` shall call its callback with an error if it fails to connect the transport **]**
**SRS_NODE_DEVICE_MQTT_18_012: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `properties` argument is not a an object. **]**
**SRS_NODE_DEVICE_MQTT_18_018: [** The `sendTwinRequest` method shall throw an `ArgumentError` if any members of the `properties` object fails to serialize to a string **]**
**SRS_NODE_DEVICE_MQTT_18_013: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `body` argument is falsy. **]**
**SRS_NODE_DEVICE_MQTT_18_022: [** The `propertyQuery` string shall be constructed from the `properties` object. **]**
**SRS_NODE_DEVICE_MQTT_18_023: [** Each member of the `properties` object shall add another 'name=value&' pair to the `propertyQuery` string. **]**
**SRS_NODE_DEVICE_MQTT_18_004: [** If a `done` callback is passed as an argument, The `sendTwinRequest` method shall call `done` after the body has been published. **]**
**SRS_NODE_DEVICE_MQTT_18_021: [** The topic name passed to the publish method shall be $iothub/twin/`method`/`resource`/?`propertyQuery` **]**
**SRS_NODE_DEVICE_MQTT_18_015: [** The `sendTwinRequest` shall publish the request with QOS=0, DUP=0, and Retain=0 **]**
**SRS_NODE_DEVICE_MQTT_18_016: [** If an error occurs in the `sendTwinRequest` method, the `done` callback shall be called with the error as the first parameter. **]**
**SRS_NODE_DEVICE_MQTT_18_024: [** If an error occurs, the `sendTwinRequest` shall use the MQTT `translateError` module to convert the mqtt-specific error to a transport agnostic error before passing it into the `done` callback. **]**
**SRS_NODE_DEVICE_MQTT_18_017: [** If the `sendTwinRequest` method is successful, the first parameter to the `done` callback shall be null and the second parameter shall be a MessageEnqueued object. **]**
**SRS_NODE_DEVICE_MQTT_16_029: [** The `sendTwinRequest` method shall connect the Mqtt connection if it is disconnected. **]**
**SRS_NODE_DEVICE_MQTT_16_031: [** If `sendTwinRequest` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the twin request. **]**
**SRS_NODE_DEVICE_MQTT_16_036: [** If `sendTwinRequest` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then `sendTwinRequest` shall fail. **]**
**SRS_NODE_DEVICE_MQTT_16_032: [** If `sendTwinRequest` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the twin request. **]**
**SRS_NODE_DEVICE_MQTT_16_033: [** The `sendTwinRequest` method shall call its callback with an error translated using `translateError` if `MqttBase` fails to connect. **]**
### getTwinReceiver(done)
The `getTwinReceiver` method creates a `MqttTwinReceiver` object for the twin response endpoint and returns it, or returns the existing instance.
**SRS_NODE_DEVICE_MQTT_18_014: [** The `getTwinReceiver` method shall throw an `ReferenceError` if done is falsy **]**
**SRS_NODE_DEVICE_MQTT_18_005: [** The `getTwinReceiver` method shall call the `done` method after it completes **]**
**SRS_NODE_DEVICE_MQTT_18_006: [** If a twin receiver for this endpoint did not previously exist, the `getTwinReceiver` method should return the a new `MqttTwinReceiver` object as the second parameter of the `done` function with null as the first parameter. **]**
**SRS_NODE_DEVICE_MQTT_18_007: [** If a twin receiver for this endpoint previously existed, the `getTwinReceiver` method should return the preexisting `MqttTwinReceiver` object as the second parameter of the `done` function with null as the first parameter. **]**
**SRS_NODE_DEVICE_MQTT_16_080: [** `updateTwinReportedProperties` shall call the `updateTwinReportedProperties` method on the `MqttTwinClient` object and pass it its callback. **]**
### enableC2D
@ -300,17 +260,17 @@ The `getTwinReceiver` method creates a `MqttTwinReceiver` object for the twin re
**SRS_NODE_DEVICE_MQTT_16_053: [** `enableMethods` shall call its callback with an `Error` if subscribing to the topic fails. **]**
### enableTwin
### enableTwinDesiredPropertiesUpdates
**SRS_NODE_DEVICE_MQTT_16_057: [** `enableTwin` shall connect the MQTT connection if it is disconnected. **]**
**SRS_NODE_DEVICE_MQTT_16_057: [** `enableTwinDesiredPropertiesUpdates` shall connect the MQTT connection if it is disconnected. **]**
**SRS_NODE_DEVICE_MQTT_16_058: [** `enableTwin` shall calls its callback with an `Error` object if it fails to connect. **]**
**SRS_NODE_DEVICE_MQTT_16_058: [** `enableTwinDesiredPropertiesUpdates` shall calls its callback with an `Error` object if it fails to connect. **]**
**SRS_NODE_DEVICE_MQTT_16_059: [** `enableTwin` shall subscribe to the MQTT topics for twins. **]**
**SRS_NODE_DEVICE_MQTT_16_059: [** `enableTwinDesiredPropertiesUpdates` shall subscribe to the MQTT topics for twins. **]**
**SRS_NODE_DEVICE_MQTT_16_060: [** `enableTwin` shall call its callback with no arguments when the `SUBACK` packet is received. **]**
**SRS_NODE_DEVICE_MQTT_16_060: [** `enableTwinDesiredPropertiesUpdates` shall call its callback with no arguments when the `SUBACK` packet is received. **]**
**SRS_NODE_DEVICE_MQTT_16_061: [** `enableTwin` shall call its callback with an `Error` if subscribing to the topics fails. **]**
**SRS_NODE_DEVICE_MQTT_16_061: [** `enableTwinDesiredPropertiesUpdates` shall call its callback with an `Error` if subscribing to the topics fails. **]**
### disableC2D
@ -332,12 +292,12 @@ The `getTwinReceiver` method creates a `MqttTwinReceiver` object for the twin re
**SRS_NODE_DEVICE_MQTT_16_046: [** `disableMethods` shall call its callback with an `Error` if an error is received while unsubscribing. **]**
### disableTwin
### disableTwinDesiredPropertiesUpdates
**SRS_NODE_DEVICE_MQTT_16_062: [** `disableTwin` shall call its callback immediately if the MQTT connection is already disconnected. **]**
**SRS_NODE_DEVICE_MQTT_16_062: [** `disableTwinDesiredPropertiesUpdates` shall call its callback immediately if the MQTT connection is already disconnected. **]**
**SRS_NODE_DEVICE_MQTT_16_063: [** `disableTwin` shall unsubscribe from the topics for twin messages. **]**
**SRS_NODE_DEVICE_MQTT_16_063: [** `disableTwinDesiredPropertiesUpdates` shall unsubscribe from the topics for twin messages. **]**
**SRS_NODE_DEVICE_MQTT_16_064: [** `disableTwin` shall call its callback with no arguments when the `UNSUBACK` packet is received. **]**
**SRS_NODE_DEVICE_MQTT_16_064: [** `disableTwinDesiredPropertiesUpdates` shall call its callback with no arguments when the `UNSUBACK` packet is received. **]**
**SRS_NODE_DEVICE_MQTT_16_065: [** `disableTwin` shall call its callback with an `Error` if an error is received while unsubscribing. **]**
**SRS_NODE_DEVICE_MQTT_16_065: [** `disableTwinDesiredPropertiesUpdates` shall call its callback with an `Error` if an error is received while unsubscribing. **]**

Просмотреть файл

@ -0,0 +1,95 @@
# MqttTwinClient Requirements
## Overview
Object used to publish, subscribe and receive twin messages.
## Responsibilities
- subscribe to the necessary topics to achieve twin operations
- format MQTT messages to send twin requests and reported properties updates
- get the full twin using the appropriate topics and request/response mechanism
- receive desired properties updates and signal them to the Mqtt client.
## Non-responsibilities
- connecting/disconnecting the Mqtt connection
- any other feature that is not twin-related
- retries and timeouts.
## Public API
```typescript
class MqttTwinClient extends EventEmitter {
constructor(client: MqttBase);
getTwin(callback: (err?: Error, twin?: TwinProperties) => void): void;
updateTwinReportedProperties(patch: any, callback: (err?: Error) => void): void;
enableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void;
disableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void;
}
```
### constructor(client: MqttBase);
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_18_001: [** The `MqttTwinClient` constructor shall accept a `client` object **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_001: [** The `MqttTwinClient` constructor shall immediately subscribe to the `message` event of the `client` object. **]**
### getTwin(callback: (err?: Error, twin?: TwinProperties) => void): void;
The `getTwin` method retrieves the device twin by publishing a twin request on a topic and receiving the response on another one.
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_002: [** The `getTwin` method shall subscribe to the `$iothub/twin/res/#` topic if not already subscribed. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_009: [** If subscribing to the response topic fails, the callback shall be called with the translated version of the error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_003: [** The `getTwin` method shall publish the request message on the `$iothub/twin/get/?rid=<requestId>` topic using the `MqttBase.publish` method. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_004: [** The `getTwin` method shall publish the request message with QoS=0, DUP=0 and Retain=0. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_005: [** The `requestId` property in the topic querystring should be set to a random unique integer that will be used to identify the response later on. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_006: [** The request message published by the `getTwin` method shall have an empty body. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_007: [** When a message is received on the response topic with an `$rid` property in the query string of the topic matching the one that was sent on the request topic, the `callback` shall be called with a `null` error object and the parsed content of the response message. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_008: [** If an error happen while publishing the request message, the `callback` shall be called with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package. **]**
### updateTwinReportedProperties(patch: any, callback: (err?: Error) => void): void;
The `updateTwinReportedProperties` publishes an update to the reported properties.
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_011: [** The `updateTwinReportedProperties` method shall subscribe to the `$iothub/twin/res/#` topic if not already subscribed. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_012: [** If subscribing to the response topic fails, the callback shall be called with the translated version of the error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_013: [** The `updateTwinReportedProperties` method shall publish the request message on the `$iothub/twin/patch/properties/reported/?rid=<requestId>` topic using the `MqttBase.publish` method. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_014: [** The `updateTwinReportedProperties` method shall publish the request message with QoS=0, DUP=0 and Retain=0. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_015: [** The `requestId` property in the topic querystring should be set to a random unique integer that will be used to identify the response later on. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_016: [** The body of the request message published by the `updateTwinReportedProperties` method shall be a JSON string of the reported properties patch. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_017: [** When a message is received on the response topic with an `$rid` property in the query string of the topic matching the one that was sent on the request topic, the `callback` shall be called with a `null` error object. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_018: [** If an error happen while publishing the request message, the `callback` shall be called with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package. **]**
### enableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void;
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_019: [** The `enableTwinDesiredPropertiesUpdates` shall subscribe to the `$iothub/twin/PATCH/properties/desired/#` topic using the `MqttBase.subscribe` method if it hasn't been subscribed to already. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_020: [** The `enableTwinDesiredPropertiesUpdates` shall call its callback with no arguments if the subscription is successful. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_021: [** if subscribing fails with an error the `enableTwinDesiredPropertiesUpdates` shall call its callback with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package. **]**
### disableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void;
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_022: [** The `disableTwinDesiredPropertiesUpdates` shall unsubscribe from the `$iothub/twin/PATCH/properties/desired/#` topic using the `MqttBase.unsubscribe` method if it hasn't been unsubscribed from already. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_023: [** The `disableTwinDesiredPropertiesUpdates` shall call its callback with no arguments if the unsubscription is successful. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_024: [** if unsubscribing fails with an error the `disableTwinDesiredPropertiesUpdates` shall call its callback with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package. **]**
### twinDesiredPropertiesUpdate event
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_025: [** Once the desired properties update topic has been subscribed to the `MqttTwinClient` shall emit a `twinDesiredPropertiesUpdate` event for messages received on that topic. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_026: [** The argument of the `twinDesiredPropertiesUpdate` event emitted shall be the object parsed from the JSON string contained in the received message. **]**

Просмотреть файл

@ -1,80 +0,0 @@
# MqttTwinReceiver Requirements
## Overview
Object used to subscribe to the Cloud-to-Device messages for Twin
## Example
```javascript
var receiver = new MqttTwinReceiver({ 'client' : client });
receiver.on('response', function(status, requestId, body) {
console.log('Response received for request ' + requestId);
console.log(' status = " + status);
console.log(body);
});
```
## Public API
### Constructor
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_001: [** The `MqttTwinReceiver` constructor shall accept a `client` object **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_002: [** The `MqttTwinReceiver` constructor shall throw `ReferenceError` if the `client` object is falsy **]**
### response event
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_003: [** When a listener is added for the `response` event, the appropriate topic shall be asynchronously subscribed to. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_009: [** The subscribed topic for `response` events shall be '$iothub/twin/res/#' **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_004: [** If there is a listener for the `response` event, a `response` event shall be emitted for each response received. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_005: [** When there are no more listeners for the `response` event, the topic should be unsubscribed **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_006: [** When a `response` event is emitted, the parameter shall be an object which contains `status`, `requestId` and `body` members **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_010: [** The topic which receives the response shall be formatted as '$iothub/twin/res/{status}/?$rid={request id}' **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_015: [** the {status} and {request id} fields in the topic name are required. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_016: [** The {status} and {request id} fields in the topic name shall be strings **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_017: [** The {status} and {request id} fields in the topic name cannot be zero length. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_014: [** Any messages received on topics which violate the topic name formatting shall be ignored. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_011: [** The `status` parameter of the `response` event shall be parsed out of the topic name from the {status} field **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_012: [** The `requestId` parameter of the `response` event shall be parsed out of the topic name from the {request id} field **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_013: [** The `body` parameter of the `response` event shall be the body of the received MQTT message **]**
### post event
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_018: [** When a listener is added to the post event, the appropriate topic shall be asynchronously subscribed to. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_019: [** The subscribed topic for post events shall be $iothub/twin/PATCH/properties/desired/# **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_020: [** If there is a listener for the post event, a post event shal be emitteded for each post message received **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_021: [** When there are no more listeners for the post event, the topic should be unsubscribed. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_022: [** When a post event it emitted, the parameter shall be the body of the message **]**
### error event
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_023: [** If the `error` event is subscribed to, an `error` event shall be emitted if any asynchronous subscribing operations fails. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_024: [** When the `error` event is emitted, the first parameter shall be an error object obtained via the MQTT `translateErrror` module. **]**
### subscribed event
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_025: [** If the `subscribed` event is subscribed to, a `subscribed` event shall be emitted after an MQTT topic is subscribed to. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_028: [** When the `subscribed` event is emitted, the parameter shall be an object which contains an `eventName` field and an `transportObject` field. **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_026: [** When the `subscribed` event is emitted because the response MQTT topic was subscribed, the `eventName` field shall be the string 'response' **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_027: [** When the `subscribed` event is emitted because the post MQTT topic was subscribed, the `eventName` field shall be the string 'post' **]**
**SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_029: [** When the subscribed event is emitted, the `transportObject` field shall contain the object returned by the library in the subscription response. **]**

Просмотреть файл

@ -7,13 +7,13 @@ import * as URL from 'url';
import * as machina from 'machina';
import { results, errors, Message, X509, AuthenticationProvider, AuthenticationType, TransportConfig } from 'azure-iot-common';
import { DeviceMethodResponse, Client, X509AuthenticationProvider, SharedAccessSignatureAuthenticationProvider } from 'azure-iot-device';
import { DeviceMethodResponse, Client, X509AuthenticationProvider, SharedAccessSignatureAuthenticationProvider, TwinProperties } from 'azure-iot-device';
import { EventEmitter } from 'events';
import * as util from 'util';
import * as dbg from 'debug';
const debug = dbg('azure-iot-device-mqtt:Mqtt');
import { MqttBase, translateError } from 'azure-iot-mqtt-base';
import { MqttTwinReceiver } from './mqtt_twin_receiver';
import { MqttTwinClient } from './mqtt_twin_client';
// tslint:disable-next-line:no-var-requires
const packageJson = require('../package.json');
@ -39,7 +39,7 @@ export class Mqtt extends EventEmitter implements Client.Transport {
*/
protected _authenticationProvider: AuthenticationProvider;
private _mqtt: MqttBase;
private _twinReceiver: MqttTwinReceiver;
private _twinClient: MqttTwinClient;
private _topicTelemetryPublish: string;
private _topicMessageSubscribe: string;
private _topicMethodSucbscribe: string;
@ -85,7 +85,9 @@ export class Mqtt extends EventEmitter implements Client.Transport {
this._mqtt.on('message', this._dispatchMqttMessage.bind(this));
this._twinReceiver = new MqttTwinReceiver(this._mqtt);
this._twinClient = new MqttTwinClient(this._mqtt);
/*Codes_SRS_NODE_DEVICE_MQTT_16_081: [The `Mqtt` constructor shall subscribe to the `MqttTwinClient` `twinDesiredPropertiesUpdates` and reemit them.]*/
this._twinClient.on('twinDesiredPropertiesUpdate', (patch) => this.emit('twinDesiredPropertiesUpdate', patch));
this._fsm = new machina.Fsm({
initialState: 'disconnected',
@ -120,28 +122,30 @@ export class Mqtt extends EventEmitter implements Client.Transport {
}
});
},
sendTwinRequest: (topic, body, sendTwinRequestCallback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_029: [The `sendTwinRequest` method shall connect the Mqtt connection if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Tests_SRS_NODE_DEVICE_MQTT_16_033: [The `sendTwinRequest` method shall call its callback with an error translated using `translateError` if `MqttBase` fails to connect.]*/
sendTwinRequestCallback(err);
} else {
this._fsm.handle('sendTwinRequest', topic, body, sendTwinRequestCallback);
}
});
},
updateSharedAccessSignature: (sharedAccessSignature, callback) => { callback(null, new results.SharedAccessSignatureUpdated(false)); },
sendMethodResponse: (response, callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_034: [The `sendMethodResponse` method shall fail with a `NotConnectedError` if the `MqttBase` object is not connected.]*/
callback(new errors.NotConnectedError('device disconnected: the service already considers the method has failed'));
},
getTwinReceiver: (callback) => {
getTwin: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_075: [`getTwin` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_076: [`getTwin` shall call its callback with an error if it fails to connect the transport]*/
callback(err);
} else {
this._fsm.handle('getTwinReceiver', callback);
this._fsm.handle('getTwin', callback);
}
});
},
updateTwinReportedProperties: (patch, callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_078: [`updateTwinReportedProperties` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_079: [`updateTwinReportedProperties` shall call its callback with an error if it fails to connect the transport]*/
callback(err);
} else {
this._fsm.handle('updateTwinReportedProperties', patch, callback);
}
});
},
@ -167,17 +171,6 @@ export class Mqtt extends EventEmitter implements Client.Transport {
}
});
},
enableTwin: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_057: [`enableTwin` shall connect the MQTT connection if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_058: [`enableTwin` shall calls its callback with an `Error` object if it fails to connect.]*/
callback(err);
} else {
this._fsm.handle('enableTwin', callback);
}
});
},
disableC2D: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_041: [`disableC2D` shall call its callback immediately if the MQTT connection is already disconnected.]*/
callback();
@ -186,10 +179,16 @@ export class Mqtt extends EventEmitter implements Client.Transport {
/*Codes_SRS_NODE_DEVICE_MQTT_16_044: [`disableMethods` shall call its callback immediately if the MQTT connection is already disconnected.]*/
callback();
},
disableTwin: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_062: [`disableTwin` shall call its callback immediately if the MQTT connection is already disconnected.]*/
callback();
}
enableTwinDesiredPropertiesUpdates: (callback) => {
this._fsm.handle('connect', (err) => {
if (err) {
callback(err);
} else {
this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback);
}
});
},
disableTwinDesiredPropertiesUpdates: (callback) => callback(),
},
connecting: {
_onEnter: (connectCallback) => {
@ -217,8 +216,6 @@ export class Mqtt extends EventEmitter implements Client.Transport {
/*Codes_SRS_NODE_DEVICE_MQTT_16_025: [If `sendEvent` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the event.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_16_035: [If `sendEvent` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then sendEvent shall fail.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_16_031: [If `sendTwinRequest` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the twin request.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_16_036: [If `sendTwinRequest` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then `sendTwinRequest` shall fail.]*/
'*': () => this._fsm.deferUntilTransition()
},
connected: {
@ -241,21 +238,6 @@ export class Mqtt extends EventEmitter implements Client.Transport {
}
});
},
sendTwinRequest: (topic, body, callback) => {
/* Codes_SRS_NODE_DEVICE_MQTT_18_001: [** The `sendTwinRequest` method shall call the publish method on `MqttTransport`. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_015: [** The `sendTwinRequest` shall publish the request with QOS=0, DUP=0, and Retain=0 **]** */
this._mqtt.publish(topic, body.toString(), { qos: 0, retain: false }, (err, puback) => {
if (err) {
/* Codes_SRS_NODE_DEVICE_MQTT_18_016: [** If an error occurs in the `sendTwinRequest` method, the `done` callback shall be called with the error as the first parameter. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_024: [** If an error occurs, the `sendTwinRequest` shall use the MQTT `translateError` module to convert the mqtt-specific error to a transport agnostic error before passing it into the `done` callback. **]** */
callback(translateError(err));
} else {
/* Codes_SRS_NODE_DEVICE_MQTT_18_004: [** If a `done` callback is passed as an argument, The `sendTwinRequest` method shall call `done` after the body has been published. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_017: [** If the `sendTwinRequest` method is successful, the first parameter to the `done` callback shall be null and the second parameter shall be a MessageEnqueued object. **]** */
callback(null, new results.MessageEnqueued(puback));
}
});
},
updateSharedAccessSignature: (sharedAccessSignature, callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_028: [The `updateSharedAccessSignature` method shall call the `updateSharedAccessSignature` method on the `MqttBase` object if it is connected.]*/
this._mqtt.updateSharedAccessSignature(sharedAccessSignature, (err) => {
@ -288,32 +270,24 @@ export class Mqtt extends EventEmitter implements Client.Transport {
callback(!!err ? translateError(err) : null);
});
},
getTwinReceiver: (callback) => {
/* Codes_SRS_NODE_DEVICE_MQTT_18_005: [** The `getTwinReceiver` method shall call the `done` method after it completes **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_006: [** If a twin receiver for this endpoint did not previously exist, the `getTwinReceiver` method should return the a new `MqttTwinReceiver` object as the second parameter of the `done` function with null as the first parameter. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_007: [** If a twin receiver for this endpoint previously existed, the `getTwinReceiver` method should return the preexisting `MqttTwinReceiver` object as the second parameter of the `done` function with null as the first parameter. **]** */
callback(null, this._twinReceiver);
},
enableC2D: (callback) => {
this._setupSubscription(this._topics.message, callback);
},
enableMethods: (callback) => {
this._setupSubscription(this._topics.method, callback);
},
enableTwin: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_059: [`enableTwin` shall subscribe to the MQTT topics for twins.]*/
this._twinReceiver.subscribe(callback);
},
disableC2D: (callback) => {
this._removeSubscription(this._topics.message, callback);
},
disableMethods: (callback) => {
this._removeSubscription(this._topics.method, callback);
},
disableTwin: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_063: [`disableTwin` shall unsubscribe from the topics for twin messages.]*/
this._twinReceiver.unsubscribe(callback);
}
/*Codes_SRS_NODE_DEVICE_MQTT_16_077: [`getTwin` shall call the `getTwin` method on the `MqttTwinClient` object and pass it its callback.]*/
getTwin: (callback) => this._twinClient.getTwin(callback),
/*Codes_SRS_NODE_DEVICE_MQTT_16_080: [`updateTwinReportedProperties` shall call the `updateTwinReportedProperties` method on the `MqttTwinClient` object and pass it its callback.]*/
updateTwinReportedProperties: (patch, callback) => this._twinClient.updateTwinReportedProperties(patch, callback),
enableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.enableTwinDesiredPropertiesUpdates(callback),
disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(callback)
},
disconnecting: {
_onEnter: (disconnectCallback, err) => {
@ -324,7 +298,6 @@ export class Mqtt extends EventEmitter implements Client.Transport {
});
},
/*Codes_SRS_NODE_DEVICE_MQTT_16_026: [If `sendEvent` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the event. ]*/
/*Codes_SRS_NODE_DEVICE_MQTT_16_032: [If `sendTwinRequest` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the twin request.]*/
'*': () => this._fsm.deferUntilTransition()
}
}
@ -343,9 +316,9 @@ export class Mqtt extends EventEmitter implements Client.Transport {
* @param {Function} done callback that shall be called when the connection is established.
*/
/* Codes_SRS_NODE_DEVICE_MQTT_12_004: [The connect method shall call the connect method on MqttTransport */
connect(done?: (err?: Error, result?: any) => void): void {
this._fsm.handle('connect', done);
}
connect(done?: (err?: Error, result?: any) => void): void {
this._fsm.handle('connect', done);
}
/**
* @private
@ -511,64 +484,6 @@ export class Mqtt extends EventEmitter implements Client.Transport {
});
}
/**
* @private
* @method module:azure-iot-device-mqtt.Mqtt#sendTwinRequest
* @description Send a device-twin specific messager to the IoT Hub instance
*
* @param {String} method name of the method to invoke ('PUSH', 'PATCH', etc)
* @param {String} resource name of the resource to act on (e.g. '/properties/reported/') with beginning and ending slashes
* @param {Object} properties object containing name value pairs for request properties (e.g. { 'rid' : 10, 'index' : 17 })
* @param {String} body body of request
* @param {Function} done the callback to be invoked when this function completes.
*
* @throws {ReferenceError} One of the required parameters is falsy
* @throws {ArgumentError} One of the parameters is an incorrect type
*/
sendTwinRequest(method: string, resource: string, properties: { [key: string]: string }, body: any, done?: (err?: Error, result?: any) => void): void {
/* Codes_SRS_NODE_DEVICE_MQTT_18_008: [** The `sendTwinRequest` method shall not throw `ReferenceError` if the `done` callback is falsy. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_009: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `method` argument is falsy. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_019: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `resource` argument is falsy. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_011: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `properties` argument is falsy. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_013: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `body` argument is falsy. **]** */
if (!method || !resource || !properties || !body) {
throw new ReferenceError('required parameter is missing');
}
/* Codes_SRS_NODE_DEVICE_MQTT_18_010: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `method` argument is not a string. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_020: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `resource` argument is not a string. **]** */
if (!util.isString(method) || !util.isString(resource)) {
throw new errors.ArgumentError('required string parameter is not a string');
}
/* Codes_SRS_NODE_DEVICE_MQTT_18_012: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `properties` argument is not a an object. **]** */
if (!util.isObject(properties)) {
throw new errors.ArgumentError('required properties parameter is not an object');
}
/* Codes_SRS_NODE_DEVICE_MQTT_18_022: [** The `propertyQuery` string shall be construced from the `properties` object. **]** */
let propString = '';
Object.keys(properties).forEach((key) => {
/* Codes_SRS_NODE_DEVICE_MQTT_18_018: [** The `sendTwinRequest` method shall throw an `ArgumentError` if any members of the `properties` object fails to serialize to a string **]** */
if (!util.isString(properties[key]) && !util.isNumber(properties[key]) && !util.isBoolean(properties[key])) {
throw new errors.ArgumentError('required properties object has non-string properties');
}
/* Codes_SRS_NODE_DEVICE_MQTT_18_023: [** Each member of the `properties` object shall add another 'name=value&' pair to the `propertyQuery` string. **]** */
propString += (propString === '') ? '?' : '&';
propString += key + '=' + properties[key];
});
/* Codes_SRS_NODE_DEVICE_MQTT_18_021: [** The topic name passed to the publish method shall be $iothub/twin/`method`/`resource`/?`propertyQuery` **]** */
const topic = '$iothub/twin/' + method + resource + propString;
this._fsm.handle('sendTwinRequest', topic, body, (err, result) => {
if (done) done(err, result);
});
}
/**
* @private
* @method module:azure-iot-device-mqtt.Mqtt.Mqtt#sendMethodResponse
@ -604,24 +519,6 @@ export class Mqtt extends EventEmitter implements Client.Transport {
this._fsm.handle('sendMethodResponse', response, done);
}
/**
* @private
* @method module:azure-iot-device-mqtt.Mqtt#getTwinReceiver
* @description Get a receiver object that handles C2D device-twin traffic
*
* @param {Function} done the callback to be invoked when this function completes.
*
* @throws {ReferenceError} One of the required parameters is falsy
*/
getTwinReceiver(done?: (err?: Error, receiver?: MqttTwinReceiver) => void): void {
/* Codes_SRS_NODE_DEVICE_MQTT_18_014: [** The `getTwinReceiver` method shall throw an `ReferenceError` if done is falsy **]** */
if (!done) {
throw new ReferenceError('required parameter is missing');
}
this._fsm.handle('getTwinReceiver', done);
}
/**
* @private
*/
@ -661,15 +558,29 @@ export class Mqtt extends EventEmitter implements Client.Transport {
/**
* @private
*/
enableTwin(callback: (err?: Error) => void): void {
this._fsm.handle('enableTwin', callback);
getTwin(callback: (err?: Error, twin?: TwinProperties) => void): void {
this._fsm.handle('getTwin', callback);
}
/**
* @private
*/
disableTwin(callback: (err?: Error) => void): void {
this._fsm.handle('disableTwin', callback);
updateTwinReportedProperties(patch: any, callback: (err?: Error) => void): void {
this._fsm.handle('updateTwinReportedProperties', patch, callback);
}
/**
* @private
*/
enableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void {
this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback);
}
/**
* @private
*/
disableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void {
this._fsm.handle('disableTwinDesiredPropertiesUpdates', callback);
}
protected _configureEndpoints(credentials: TransportConfig): void {

Просмотреть файл

@ -0,0 +1,266 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
import { EventEmitter } from 'events';
import { TwinProperties } from 'azure-iot-device';
import { MqttBase, translateError } from 'azure-iot-mqtt-base';
import * as querystring from 'querystring';
import * as url from 'url';
import * as machina from 'machina';
import * as dbg from 'debug';
const debug = dbg('azure-iot-device-mqtt:MqttTwinClient');
// $iothub/twin/PATCH/properties/reported/?$rid={request id}&$version={base version}
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_009: [** The subscribed topic for `response` events shall be '$iothub/twin/res/#' **]** */
const responseTopic = '$iothub/twin/res/#';
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_019: [** The subscribed topic for post events shall be $iothub/twin/PATCH/properties/desired/# **]** */
const desiredPropertiesUpdatesTopic = '$iothub/twin/PATCH/properties/desired/#';
interface TopicSubscription {
mqttClient: MqttBase;
topic: string;
}
/**
* @private
* @class module:azure-iot-device-mqtt.MqttTwinReceiver
* @classdesc Acts as a receiver for device-twin traffic
*
* @param {Object} config configuration object
* @fires MqttTwinReceiver#subscribed an MQTT topic has been successfully subscribed to
* @fires MqttTwinReceiver#error an error has occured while subscribing to an MQTT topic
* @fires MqttTwinReceiver#response a response message has been received from the service
* @fires MqttTwinReceiver#post a post message has been received from the service
* @throws {ReferenceError} If client parameter is falsy.
*
*/
export class MqttTwinClient extends EventEmitter {
static desiredPropertiesUpdateEvent: string = 'twinDesiredPropertiesUpdate';
private _mqtt: MqttBase;
private _pendingTwinRequests: { [key: string]: any } = {};
private _topicFsm: machina.BehavioralFsm;
private _responseTopic: TopicSubscription;
private _desiredPropertiesUpdatesTopic: TopicSubscription;
constructor(client: MqttBase) {
super();
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_18_001: [The `MqttTwinClient` constructor shall accept a `client` object.]*/
this._mqtt = client;
const messageHandler = this._onMqttMessage.bind(this);
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_001: [The `MqttTwinClient` constructor shall immediately subscribe to the `message` event of the `client` object.]*/
this._mqtt.on('message', messageHandler);
this._topicFsm = new machina.BehavioralFsm({
initialState: 'unsubscribed',
states: {
unsubscribed: {
_onEnter: function (client: TopicSubscription, err: Error, callback: (err?: Error) => void): void {
if (callback) {
callback(err);
}
},
subscribe: function (client: TopicSubscription, callback: (err?: Error) => void): void {
this.transition(client, 'subscribing', callback);
},
unsubscribe: function (client: TopicSubscription, callback: (err?: Error) => void): void {
// not entirely sure about that. if subscription are restored because cleanSession is false, it means technically a user may want to unsubscribe
// even though subscribe hasn't been called yet.
callback();
}
},
subscribing: {
_onEnter: function (client: TopicSubscription, callback: (err?: Error) => void): void {
client.mqttClient.subscribe(client.topic, { qos: 0 }, (err, result) => {
if (err) {
this.transition(client, 'unsubscribed', err, callback);
} else {
debug('subscribed to response topic: ' + JSON.stringify(result));
this.transition(client, 'subscribed', callback);
}
});
},
'*': function (client: TopicSubscription): void { this.deferUntilTransition(client); }
},
subscribed: {
_onEnter: function (client: TopicSubscription, callback: (err?: Error) => void): void {
callback();
},
subscribe: function (client: TopicSubscription, callback: (err?: Error) => void): void {
callback();
},
unsubscribe: function (client: TopicSubscription, callback: (err?: Error) => void): void {
this.transition(client, 'unsubscribing', callback);
}
},
unsubscribing: {
_onEnter: function (client: TopicSubscription, callback: (err?: Error) => void): void {
client.mqttClient.unsubscribe(client.topic, (err, result) => {
if (err) {
debug('failed to unsubscribe: ' + err.toString());
} else {
debug('unsubscribed from: ' + client.topic);
}
this.transition(client, 'unsubscribed', err, callback);
});
},
'*': function (client: TopicSubscription): void { this.deferUntilTransition(client); }
}
},
subscribe: function (client: TopicSubscription, callback: (err?: Error) => void): void {
this.handle(client, 'subscribe', callback);
},
unsubscribe: function (client: TopicSubscription, callback: (err?: Error) => void): void {
this.handle(client, 'unsubscribe', callback);
}
});
this._responseTopic = {
mqttClient: this._mqtt,
topic: responseTopic
};
this._desiredPropertiesUpdatesTopic = {
mqttClient: this._mqtt,
topic: desiredPropertiesUpdatesTopic
};
}
getTwin(callback: (err?: Error, twin?: TwinProperties) => void): void {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_002: [The `getTwin` method shall subscribe to the `$iothub/twin/res/#` topic if not already subscribed.]*/
this._topicFsm.subscribe(this._responseTopic, (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_009: [If subscribing to the response topic fails, the callback shall be called with the translated version of the error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
callback(translateError(err));
} else {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_003: [The `getTwin` method shall publish the request message on the `$iothub/twin/get/?rid=<requestId>` topic using the `MqttBase.publish` method.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_006: [The request message published by the `getTwin` method shall have an empty body.]*/
this._sendTwinRequest('GET', '/', ' ', callback);
}
});
}
updateTwinReportedProperties(patch: any, callback: (err?: Error) => void): void {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_011: [The `updateTwinReportedProperties` method shall subscribe to the `$iothub/twin/res/#` topic if not already subscribed.]*/
this._topicFsm.subscribe(this._responseTopic, (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_012: [If subscribing to the response topic fails, the callback shall be called with the translated version of the error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
callback(translateError(err));
} else {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_013: [The `updateTwinReportedProperties` method shall publish the request message on the `$iothub/twin/patch/properties/reported/?rid=<requestId>` topic using the `MqttBase.publish` method.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_016: [The body of the request message published by the `updateTwinReportedProperties` method shall be a JSON string of the reported properties patch.]*/
this._sendTwinRequest('PATCH', '/properties/reported/', JSON.stringify(patch), callback);
}
});
}
enableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_019: [The `enableTwinDesiredPropertiesUpdates` shall subscribe to the `$iothub/twin/PATCH/properties/desired/#` topic using the `MqttBase.subscribe` method if it hasn't been subscribed to already.]*/
this._topicFsm.subscribe(this._desiredPropertiesUpdatesTopic, (err, suback) => {
if (err) {
debug('failed to subscribe to desired properties updates: ' + err.toString());
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_021: [if subscribing fails with an error the `enableTwinDesiredPropertiesUpdates` shall call its callback with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
callback(translateError(err));
} else {
debug('suback: ' + JSON.stringify(suback));
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_020: [The `enableTwinDesiredPropertiesUpdates` shall call its callback with no arguments if the subscription is successful.]*/
callback();
}
});
}
disableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_022: [The `disableTwinDesiredPropertiesUpdates` shall unsubscribe from the `$iothub/twin/PATCH/properties/desired/#` topic using the `MqttBase.unsubscribe` method if it hasn't been unsubscribed from already.]*/
this._topicFsm.unsubscribe(this._desiredPropertiesUpdatesTopic, (err, suback) => {
if (err) {
debug('failed to subscribe to desired properties updates: ' + err.toString());
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_024: [if unsubscribing fails with an error the `disableTwinDesiredPropertiesUpdates` shall call its callback with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
callback(translateError(err));
} else {
debug('suback: ' + JSON.stringify(suback));
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_023: [The `disableTwinDesiredPropertiesUpdates` shall call its callback with no arguments if the unsubscription is successful.]*/
callback();
}
});
}
private _sendTwinRequest(method: string, resource: string, body: any, callback?: (err?: Error, result?: any) => void): void {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_005: [The `requestId` property in the topic querystring should be set to a random unique integer that will be used to identify the response later on.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_015: [The `requestId` property in the topic querystring should be set to a random unique integer that will be used to identify the response later on.]*/
const requestId = Math.ceil(Math.random() * 10000).toString();
let propString = '?$rid=' + requestId;
const topic = '$iothub/twin/' + method + resource + propString;
this._pendingTwinRequests[requestId] = callback;
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_004: [The `getTwin` method shall publish the request message with QoS=0, DUP=0 and Retain=0.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_014: [The `updateTwinReportedProperties` method shall publish the request message with QoS=0, DUP=0 and Retain=0.]*/
this._mqtt.publish(topic, body.toString(), { qos: 0, retain: false }, (err, puback) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_008: [If an error happen while publishing the request message, the `callback` shall be called with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_018: [If an error happen while publishing the request message, the `callback` shall be called with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
delete this._pendingTwinRequests[requestId];
callback(translateError(err));
} else {
debug('twin request sent: ' + puback);
}
});
}
private _onMqttMessage(topic: string, message: any): void {
debug('mqtt message received');
if (topic.indexOf('$iothub/twin/res') === 0) {
debug('response message received');
this._onResponseMessage(topic, message);
} else if (topic.indexOf('$iothub/twin/PATCH/properties/desired') === 0) {
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_025: [Once the desired properties update topic has been subscribed to the `MqttTwinClient` shall emit a `twinDesiredPropertiesUpdate` event for messages received on that topic.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_026: [The argument of the `twinDesiredPropertiesUpdate` event emitted shall be the object parsed from the JSON string contained in the received message.]*/
this.emit(MqttTwinClient.desiredPropertiesUpdateEvent, JSON.parse(message));
} else {
debug('Message received on a topic we can ignore: ' + topic);
}
}
private _onResponseMessage(topic: string, message: any): void {
let urlObject: url.Url;
let path: string[];
let query: any;
try {
urlObject = url.parse(topic);
path = urlObject.path.split('/');
query = querystring.parse(urlObject.query as string);
} catch (err) {
return;
}
if ((path[0] === '$iothub') &&
(path[1] === 'twin') &&
(path[2] === 'res') &&
(path[3]) &&
(path[3].toString().length > 0) &&
(query.$rid) &&
(query.$rid.toString().length > 0)) {
if (this._pendingTwinRequests[query.$rid]) {
const requestCallback = this._pendingTwinRequests[query.$rid];
delete this._pendingTwinRequests[query.$rid];
// should we really ignore the status code?
const responseBody = message.toString();
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_007: [When a message is received on the response topic with an `$rid` property in the query string of the topic matching the one that was sent on the request topic, the `callback` shall be called with a `null` error object and the parsed content of the response message.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_017: [When a message is received on the response topic with an `$rid` property in the query string of the topic matching the one that was sent on the request topic, the `callback` shall be called with a `null` error object.]*/
requestCallback(null, responseBody ? JSON.parse(responseBody) : undefined);
} else {
debug('received a response for a request we do not know about: ' + query.$rid);
}
} else {
debug('received a response with a malformed topic property: ' + topic);
}
}
}

Просмотреть файл

@ -1,239 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
import { EventEmitter } from 'events';
import { MqttBase, translateError } from 'azure-iot-mqtt-base';
import * as querystring from 'querystring';
import * as url from 'url';
import * as machina from 'machina';
import * as dbg from 'debug';
const debug = dbg('azure-iot-device-mqtt:MqttTwinReceiver');
// $iothub/twin/PATCH/properties/reported/?$rid={request id}&$version={base version}
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_009: [** The subscribed topic for `response` events shall be '$iothub/twin/res/#' **]** */
const responseTopic = '$iothub/twin/res/#';
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_019: [** The subscribed topic for post events shall be $iothub/twin/PATCH/properties/desired/# **]** */
const postTopic = '$iothub/twin/PATCH/properties/desired/#';
/**
* @private
* @class module:azure-iot-device-mqtt.MqttTwinReceiver
* @classdesc Acts as a receiver for device-twin traffic
*
* @param {Object} config configuration object
* @fires MqttTwinReceiver#subscribed an MQTT topic has been successfully subscribed to
* @fires MqttTwinReceiver#error an error has occured while subscribing to an MQTT topic
* @fires MqttTwinReceiver#response a response message has been received from the service
* @fires MqttTwinReceiver#post a post message has been received from the service
* @throws {ReferenceError} If client parameter is falsy.
*
*/
export class MqttTwinReceiver extends EventEmitter {
static errorEvent: string = 'error';
static responseEvent: string = 'response';
static postEvent: string = 'post';
static subscribedEvent: string = 'subscribed';
private _mqtt: MqttBase;
private _fsm: machina.Fsm;
constructor(client: MqttBase) {
super();
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_001: [** The `MqttTwinReceiver` constructor shall accept a `client` object **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_002: [** The `MqttTwinReceiver` constructor shall throw `ReferenceError` if the `client` object is falsy **]** */
if (!client) {
throw new ReferenceError('required parameter is missing');
}
this._mqtt = client;
this.on('newListener', (eventName) => {
if (eventName === MqttTwinReceiver.responseEvent || eventName === MqttTwinReceiver.postEvent) {
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_003: [** When a listener is added for the `response` event, the appropriate topic shall be asynchronously subscribed to. **]** */
this._fsm.handle('subscribe', (err) => {
if (err) {
debug('error while subscribing: ' + err.toString());
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_023: [** If the `error` event is subscribed to, an `error` event shall be emitted if any asynchronous subscribing operations fails. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_024: [** When the `error` event is emitted, the first parameter shall be an error object obtained via the MQTT `translateError` module. **]** */
this.emit(MqttTwinReceiver.errorEvent, translateError(err));
}
});
}
});
this.on('removeListener', (eventName) => {
if (eventName === MqttTwinReceiver.responseEvent || eventName === MqttTwinReceiver.postEvent) {
if (this.listeners('response').length + this.listeners('post').length <= 0) {
this._fsm.handle('unsubscribe', (err) => {
if (err) {
debug('error while unsubscribing: ' + err.toString());
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_024: [** When the `error` event is emitted, the first parameter shall be an error object obtained via the MQTT `translateError` module. **]** */
this.emit(MqttTwinReceiver.errorEvent, translateError(err));
}
});
}
}
});
const messageHandler = this._onMqttMessage.bind(this);
this._fsm = new machina.Fsm({
namespace: 'device-mqtt',
initialState: 'unsubscribed',
states: {
unsubscribed: {
_onEnter: (callback, err) => {
this._mqtt.removeListener('message', messageHandler);
if (callback) {
callback(err);
} else if (err) {
this.emit('error', translateError(err));
}
},
subscribe: (callback) => this._fsm.transition('subscribingToResponseTopic', callback),
unsubscribe: (callback) => callback()
},
subscribingToResponseTopic: {
_onEnter: (callback) => {
debug('subscribing to response topic');
this._mqtt.on('message', messageHandler);
this._mqtt.subscribe(responseTopic, { qos: 0 }, (err, responseTopicResult) => {
if (err) {
this._fsm.transition('unsubscribed', callback, err);
} else {
debug('subscribed to response topic');
this._fsm.transition('subscribingToPostTopic', callback, responseTopicResult);
}
});
},
'*': () => this._fsm.deferUntilTransition()
},
subscribingToPostTopic: {
_onEnter: (callback, responseTopicResult) => {
debug('subscribed to post topic');
this._mqtt.subscribe(postTopic, { qos: 0 }, (err, postTopicResult) => {
if (err) {
this._fsm.transition('unsubscribingFromResponseTopic', callback, err);
} else {
debug('subscribed to post topic');
this._fsm.transition('subscribed', callback, responseTopicResult, postTopicResult);
}
});
},
'*': () => this._fsm.deferUntilTransition()
},
subscribed: {
_onEnter: (callback, responseTopicResult, postTopicResult) => {
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_025: [** If the `subscribed` event is subscribed to, a `subscribed` event shall be emitted after an MQTT topic is subscribed to. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_028: [** When the `subscribed` event is emitted, the parameter shall be an object which contains an `eventName` field and an `transportObject` field. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_026: [** When the `subscribed` event is emitted because the response MQTT topic was subscribed, the parameter shall be the string 'response' **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_029: [** When the subscribed event is emitted, the `transportObject` field shall contain the object returned by the library in the subscription response. **]** */
this.emit(MqttTwinReceiver.subscribedEvent, { 'eventName' : MqttTwinReceiver.responseEvent, 'transportObject' : responseTopicResult });
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_025: [** If the `subscribed` event is subscribed to, a `subscribed` event shall be emitted after an MQTT topic is subscribed to. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_028: [** When the `subscribed` event is emitted, the parameter shall be an object which contains an `eventName` field and an `transportObject` field. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_027: [** When the `subscribed` event is emitted because the post MQTT topic was subscribed, the `eventName` field shall be the string 'post' **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_029: [** When the subscribed event is emitted, the `transportObject` field shall contain the object returned by the library in the subscription response. **]** */
this.emit(MqttTwinReceiver.subscribedEvent, { 'eventName' : MqttTwinReceiver.postEvent, 'transportObject' : postTopicResult });
callback();
},
subscribe: (callback) => callback(),
unsubscribe: (callback) => this._fsm.transition('unsubscribingFromPostTopic', callback)
},
unsubscribingFromPostTopic: {
_onEnter: (callback) => {
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_021: [** When there are no more listeners for the post event, the topic should be unsubscribed. **]** */
this._mqtt.unsubscribe(postTopic, (err) => {
this._fsm.transition('unsubscribingFromResponseTopic', callback, err);
});
},
'*': () => this._fsm.deferUntilTransition()
},
unsubscribingFromResponseTopic: {
_onEnter: (callback, forwardedError) => {
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_005: [** When there are no more listeners for the `response` event, the topic should be unsubscribed **]** */
this._mqtt.unsubscribe(responseTopic, (err) => {
let finalError = forwardedError || err;
this._fsm.transition('unsubscribed', callback, finalError);
});
},
'*': () => this._fsm.deferUntilTransition()
}
}
});
}
subscribe(callback: (err?: Error) => void): void {
this._fsm.handle('subscribe', callback);
}
unsubscribe(callback: (err?: Error) => void): void {
this._fsm.handle('unsubscribe', callback);
}
private _onMqttMessage(topic: string, message: any): void {
debug('mqtt message received');
if (topic.indexOf('$iothub/twin/res') === 0) {
debug('response message received');
this._onResponseMessage(topic, message);
} else if (topic.indexOf('$iothub/twin/PATCH') === 0) {
debug('post message received');
this._onPostMessage(topic, message);
}
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_014: [** Any messages received on topics which violate the topic name formatting shall be ignored. **]** */
}
private _onResponseMessage(topic: string, message: any): void {
let urlObject: url.Url;
let path: string[];
let query: any;
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_014: [** Any messages received on topics which violate the topic name formatting shall be ignored. **]** */
try {
urlObject = url.parse(topic);
path = urlObject.path.split('/');
query = querystring.parse(urlObject.query as string);
} catch (err) {
return;
}
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_006: [** When a `response` event is emitted, the parameter shall be an object which contains `status`, `requestId` and `body` members **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_010: [** The topic which receives the response shall be formatted as '$iothub/twin/res/{status}/?$rid={request id}' **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_015: [** the {status} and {request id} fields in the topic name are required. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_016: [** The {status} and {request id} fields in the topic name shall be strings **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_017: [** The {status} and {request id} fields in the topic name cannot be zero length. **]** */
if ((path[0] === '$iothub') &&
(path[1] === 'twin') &&
(path[2] === 'res') &&
(path[3]) &&
(path[3].toString().length > 0) &&
(query.$rid) &&
(query.$rid.toString().length > 0)) {
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_011: [** The `status` parameter of the `response` event shall be parsed out of the topic name from the {status} field **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_012: [** The `requestId` parameter of the `response` event shall be parsed out of the topic name from the {request id} field **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_013: [** The `body` parameter of the `response` event shall be the body of the received MQTT message **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_011: [** The `status` parameter of the `response` event shall be parsed out of the topic name from the {status} field **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_012: [** The `requestId` parameter of the `response` event shall be parsed out of the topic name from the {request id} field **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_013: [** The `body` parameter of the `response` event shall be the body of the received MQTT message **]** */
const response = {
'topic' : topic,
'status' : Number(path[3]),
'$rid' : query.$rid,
'body' : message
};
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_004: [** If there is a listener for the `response` event, a `response` event shall be emitted for each response received. **]** */
this.emit(MqttTwinReceiver.responseEvent, response);
}
}
private _onPostMessage(topic: string, message: any): void {
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_020: [** If there is a listener for the post event, a post event shal be emitteded for each post message received **]** */
if (topic.indexOf('$iothub/twin/PATCH/properties/desired') === 0) {
/* Codes_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_022: [** When a post event it emitted, the parameter shall be the body of the message **]** */
this.emit(MqttTwinReceiver.postEvent, message);
}
}
}

Просмотреть файл

@ -1,233 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
var EventEmitter = require('events').EventEmitter;
var assert = require('chai').assert;
var Mqtt = require('../lib/mqtt').Mqtt;
var errors = require('azure-iot-common').errors;
var sinon = require('sinon');
var MqttTwinReceiver = require('../lib/mqtt_twin_receiver.js').MqttTwinReceiver;
var config = {
'host' : 'mock_host',
'deviceId' : 'mock_deviceId',
'sharedAccessSignature' : 'mock_sharedAccessSignature'
};
var fakeAuthenticationProvider;
var fakeMqttBase;
var transport;
describe('Mqtt: Device Twin', function () {
beforeEach(function() {
fakeAuthenticationProvider = {
getDeviceCredentials: sinon.stub().callsFake(function (callback) {
callback(null, config);
})
};
fakeMqttBase = new EventEmitter();
fakeMqttBase.connect = sinon.stub().callsArg(1);
fakeMqttBase.disconnect = sinon.stub().callsArg(0);
fakeMqttBase.publish = sinon.stub().callsArg(3);
fakeMqttBase.subscribe = sinon.stub().callsArg(2);
fakeMqttBase.unsubscribe = sinon.stub().callsArg(1);
fakeMqttBase.updateSharedAccessSignature = sinon.stub().callsArg(1);
transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
});
describe('#sendTwinRequest', function () {
/*Tests_SRS_NODE_DEVICE_MQTT_16_029: [The `sendTwinRequest` method shall connect the Mqtt connection if it is disconnected.]*/
it('connects the transport if currently disconnected', function (testCallback) {
transport.sendTwinRequest('PUT', '/res', {'rid':10}, 'body', function () {
assert.isTrue(fakeMqttBase.connect.calledOnce);
assert.isTrue(fakeMqttBase.publish.calledOnce);
testCallback();
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_16_033: [The `sendTwinRequest` method shall call its callback with an error translated using `translateError` if `MqttBase` fails to connect.]*/
it('calls the callback with an error if the transport fails to connect', function (testCallback) {
fakeMqttBase.connect = sinon.stub().callsArgWith(1, new Error('fake error'));
transport.sendTwinRequest('PUT', '/res', {'rid':10}, 'body', function (err) {
assert.isTrue(fakeMqttBase.connect.calledOnce);
assert.instanceOf(err, Error);
testCallback();
});
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_001: [** The `sendTwinRequest` method shall call the publish method on `MqttBase`. **]** */
it('calls publish method on transport', function (testCallback) {
transport.connect(function() {
transport.sendTwinRequest('PUT', '/res', {'rid':10}, 'body', function() {
assert(fakeMqttBase.publish.calledOnce);
testCallback();
});
});
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_008: [** The `sendTwinRequest` method shall not throw if the `done` callback is falsy. **]** */
it('does not throw if done is falsy', function() {
transport.connect();
assert.doesNotThrow(function() {
transport.sendTwinRequest('PUT', '/res', {'rid':10}, 'body', null);
});
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_009: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `method` argument is falsy. **]** */
it('throws if method is falsy', function() {
transport.connect();
assert.throws(function() {
transport.sendTwinRequest(null, '/res', {'rid':10}, 'body', function() {});
}, ReferenceError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_010: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `method` argument is not a string. **]** */
it('throws if method is not a string', function() {
transport.connect();
assert.throws(function() {
transport.sendTwinRequest({}, '/res', {'rid':10}, 'body', function() {});
}, errors.ArgumentError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_019: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `resource` argument is falsy. **]** */
it('throws if resource is falsy', function() {
transport.connect();
assert.throws(function() {
transport.sendTwinRequest('PUT', null, {'rid':10}, 'body', function() {});
}, ReferenceError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_020: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `resource` argument is not a string. **]** */
it('throws if resource is not a string', function() {
transport.connect();
assert.throws(function() {
transport.sendTwinRequest('PUT', {}, {'rid':10}, 'body', function() {});
}, errors.ArgumentError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_011: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `properties` argument is falsy. **]** */
it('throws if properties is falsy', function() {
transport.connect();
assert.throws(function() {
transport.sendTwinRequest('PUT', '/res', null, 'body', function() {});
}, ReferenceError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_012: [** The `sendTwinRequest` method shall throw an `ArgumentError` if the `properties` argument is not a an object. **]** */
it('throws if properties is not an object', function() {
transport.connect();
assert.throws(function() {
transport.sendTwinRequest('PUT', '/res', 'properties', 'body', function() {});
}, errors.ArgumentError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_018: [** The `sendTwinRequest` method shall throw an `ArgumentError` if any members of the `properties` object fails to serialize to a string **]** */
it('throws if properties fails to deserialize to a string', function() {
transport.connect();
assert.throws(function() {
transport.sendTwinRequest('PUT', '/res', {'func': function() {}}, 'body', function() {});
}, errors.ArgumentError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_013: [** The `sendTwinRequest` method shall throw an `ReferenceError` if the `body` argument is falsy. **]** */
it('throws if body is falsy', function() {
transport.connect();
assert.throws(function() {
transport.sendTwinRequest('PUT', '/res', {'rid' : 10}, null, function() {});
}, ReferenceError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_022: [** The `propertyQuery` string shall be construced from the `properties` object. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_18_023: [** Each member of the `properties` object shall add another 'name=value&' pair to the `propertyQuery` string. **]** */
it('correctly builds properties string', function() {
transport.connect();
transport.sendTwinRequest('a/', 'b/', {'rid' : 10, 'pid' : 20}, 'body', function() {});
assert(fakeMqttBase.publish.calledWith('$iothub/twin/a/b/?rid=10&pid=20'));
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_004: [** If a `done` callback is passed as an argument, The `sendTwinRequest` method shall call `done` after the body has been published. **]** */
it('calls done after the body has been published', function(done) {
transport.connect();
transport.sendTwinRequest('PUT', '/res', {'rid':10}, 'body', done);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_021: [** The topic name passed to the publish method shall be $iothub/twin/`method`/`resource`/?`propertyQuery` **]** */
it('uses the correct topic name', function() {
// 7.5.2: $iothub/twin/PATCH/properties/reported/?$rid={request id}&$version={base version}
transport.connect();
transport.sendTwinRequest('PATCH', '/properties/reported/', {'$rid':10, '$version': 200}, 'body', function() {});
assert(fakeMqttBase.publish.calledWith('$iothub/twin/PATCH/properties/reported/?$rid=10&$version=200'));
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_015: [** The `sendTwinRequest` shall publish the request with QOS=0, DUP=0, and Retain=0 **]** */
it('uses the correct publish parameters', function() {
transport.connect();
transport.sendTwinRequest('PATCH', '/properties/reported/', {'$rid':10, '$version': 200}, 'body', function() {});
var publishoptions = fakeMqttBase.publish.getCall(0).args[2];
assert.equal(publishoptions.qos, 0);
assert.equal(publishoptions.retain, false);
// no way to verify DUP flag
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_016: [** If an error occurs in the `sendTwinRequest` method, the `done` callback shall be called with the error as the first parameter. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_18_024: [** If an error occurs, the `sendTwinRequest` shall use the MQTT `translateError` module to convert the mqtt-specific error to a transport agnostic error before passing it into the `done` callback. **]** */
it('calls done with an error on failure', function(done) {
fakeMqttBase.publish = sinon.stub().callsArgWith(3, new Error('Invalid topic'));
transport.connect();
transport.sendTwinRequest('PATCH', '/properties/reported/', {'$rid':10, '$version': 200}, 'body', function(err) {
assert.equal(err.constructor.name, 'FormatError');
done();
});
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_017: [** If the `sendTwinRequest` method is successful, the first parameter to the `done` callback shall be null and the second parameter shall be a MessageEnqueued object. **]** */
it('returns MessageEnqueued object on success', function(done) {
transport.connect();
transport.sendTwinRequest('PATCH', '/properties/reported/', {'$rid':10, '$version': 200}, 'body', function(err, res) {
if (err) {
done(err);
} else {
assert.equal(res.constructor.name, 'MessageEnqueued');
done();
}
});
});
});
describe('#getTwinReceiver', function () {
/* Tests_SRS_NODE_DEVICE_MQTT_18_014: [** The `getTwinReceiver` method shall throw an `ReferenceError` if done is falsy **]** */
it ('throws if done is falsy', function() {
assert.throws(function() {
transport.getTwinReceiver();
}, ReferenceError);
});
/* Tests_SRS_NODE_DEVICE_MQTT_18_005: [** The `getTwinReceiver` method shall call the `done` method after it completes **]** */
it ('calls done when complete', function(done) {
transport.connect(function () {
transport.getTwinReceiver(done);
});
});
});
it('connects the transport if disconnected', function (testCallback) {
transport.getTwinReceiver(function () {
assert.isTrue(fakeMqttBase.connect.calledOnce);
testCallback();
});
});
it('calls the callback with an error if the transport fails to connect', function (testCallback) {
fakeMqttBase.connect = sinon.stub().callsArgWith(1, new Error('fake'));
transport.getTwinReceiver(function (err) {
assert.instanceOf(err, Error);
testCallback();
});
});
});

Просмотреть файл

@ -163,7 +163,8 @@ describe('Mqtt as MqttReceiver', function () {
});
describe('#method', function() {
// Tests_SRS_NODE_DEVICE_MQTT_RECEIVER_13_002: [ When a listener is added for the method event, the topic should be subscribed to. ]*/
/* Tests_SRS_NODE_DEVICE_MQTT_RECEIVER_13_002: [ When a listener is added for the method event, the topic should be subscribed to. ]*/
// note: that test really does not test this requirement. Not since we introduced the enable/disable methods
it('does not subscribe twice to the same topic for multiple event registrations', function () {
var receiver = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
receiver.connect(function () {
@ -173,8 +174,8 @@ describe('Mqtt as MqttReceiver', function () {
receiver.on('message', function () { });
// assert
assert.isTrue(fakeMqttBase.on.calledTwice,
'mqttClient.on was not called twice (error + message)');
assert.isTrue(fakeMqttBase.on.calledThrice,
'mqttClient.on was not called thrice (error + message * 2 (c2d + twin)');
assert.isTrue(fakeMqttBase.on.calledWith('message'),
'mqttClient.on was not called for "message" event');
assert.isTrue(fakeMqttBase.on.calledWith('error'),

Просмотреть файл

@ -14,9 +14,8 @@ var AuthenticationType = require('azure-iot-common').AuthenticationType;
var EventEmitter = require('events').EventEmitter;
describe('Mqtt', function () {
var fakeConfig, fakeAuthenticationProvider;
var fakeConfig, fakeAuthenticationProvider, fakeMqttBase;
var fakeMqttBase;
beforeEach(function () {
fakeConfig = {
host: 'host.name',
@ -681,8 +680,8 @@ describe('Mqtt', function () {
topicName: '$iothub/methods/POST/#'
},
{
methodName: 'enableTwin',
topicName: '$iothub/twin/res/#'
methodName: 'enableTwinDesiredPropertiesUpdates',
topicName: '$iothub/twin/PATCH/properties/desired/#'
}
].forEach(function (testConfig) {
describe('#' + testConfig.methodName, function () {
@ -691,15 +690,15 @@ describe('Mqtt', function () {
transport[testConfig.methodName](function (err) {
/*Tests_SRS_NODE_DEVICE_MQTT_16_047: [`enableC2D` shall connect the MQTT connection if it is disconnected.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_038: [`enableMethods` shall connect the MQTT connection if it is disconnected.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_057: [`enableTwin` shall connect the MQTT connection if it is disconnected.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_057: [`enableTwinDesiredPropertiesUpdates` shall connect the MQTT connection if it is disconnected.]*/
assert.isTrue(fakeMqttBase.connect.calledOnce);
/*Tests_SRS_NODE_DEVICE_MQTT_16_050: [`enableC2D` shall call its callback with no arguments when the `SUBACK` packet is received.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_051: [`enableMethods` shall call its callback with no arguments when the `SUBACK` packet is received.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_060: [`enableTwin` shall call its callback with no arguments when the `SUBACK` packet is received.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_060: [`enableTwinDesiredPropertiesUpdates` shall call its callback with no arguments when the `SUBACK` packet is received.]*/
assert.isUndefined(err);
/*Tests_SRS_NODE_DEVICE_MQTT_16_049: [`enableC2D` shall subscribe to the MQTT topic for messages.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_040: [`enableMethods` shall subscribe to the MQTT topic for direct methods.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_059: [`enableTwin` shall subscribe to the MQTT topics for twins.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_059: [`enableTwinDesiredPropertiesUpdates` shall subscribe to the MQTT topics for twins.]*/
assert.isTrue(fakeMqttBase.subscribe.calledWith(testConfig.topicName));
testCallback();
});
@ -707,7 +706,7 @@ describe('Mqtt', function () {
/*Tests_SRS_NODE_DEVICE_MQTT_16_048: [`enableC2D` shall calls its callback with an `Error` object if it fails to connect.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_039: [`enableMethods` shall calls its callback with an `Error` object if it fails to connect.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_058: [`enableTwin` shall calls its callback with an `Error` object if it fails to connect.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_058: [`enableTwinDesiredPropertiesUpdates` shall calls its callback with an `Error` object if it fails to connect.]*/
it('calls its callback with an error if it fails to connect', function (testCallback) {
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
fakeMqttBase.connect = sinon.stub().callsArgWith(1, new Error('fake error'));
@ -720,7 +719,7 @@ describe('Mqtt', function () {
/*Tests_SRS_NODE_DEVICE_MQTT_16_052: [`enableC2D` shall call its callback with an `Error` if subscribing to the topic fails.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_053: [`enableMethods` shall call its callback with an `Error` if subscribing to the topic fails.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_061: [`enableTwin` shall call its callback with an `Error` if subscribing to the topics fails.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_061: [`enableTwinDesiredPropertiesUpdates` shall call its callback with an `Error` if subscribing to the topics fails.]*/
it('calls its callback with an error if subscribing fails', function (testCallback) {
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
fakeMqttBase.subscribe = sinon.stub().callsArgWith(2, new Error('fake error'));
@ -738,12 +737,12 @@ describe('Mqtt', function () {
[
{ enableFeatureMethod: 'enableC2D', disableFeatureMethod: 'disableC2D' },
{ enableFeatureMethod: 'enableMethods', disableFeatureMethod: 'disableMethods' },
{ enableFeatureMethod: 'enableTwin', disableFeatureMethod: 'disableTwin' }
{ enableFeatureMethod: 'enableTwinDesiredPropertiesUpdates', disableFeatureMethod: 'disableTwinDesiredPropertiesUpdates' }
].forEach(function (testConfig) {
describe('#' + testConfig.disableFeatureMethod, function () {
/*Tests_SRS_NODE_DEVICE_MQTT_16_041: [`disableC2D` shall call its callback immediately if the MQTT connection is already disconnected.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_044: [`disableMethods` shall call its callback immediately if the MQTT connection is already disconnected.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_062: [`disableTwin` shall call its callback immediately if the MQTT connection is already disconnected.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_062: [`disableTwinDesiredPropertiesUpdates` shall call its callback immediately if the MQTT connection is already disconnected.]*/
it('immediately calls its callback if the disconnected', function (testCallback) {
var mqtt = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
mqtt[testConfig.disableFeatureMethod](function () {
@ -755,7 +754,7 @@ describe('Mqtt', function () {
/*Tests_SRS_NODE_DEVICE_MQTT_16_043: [`disableC2D` shall call its callback with an `Error` if an error is received while unsubscribing.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_046: [`disableMethods` shall call its callback with an `Error` if an error is received while unsubscribing.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_065: [`disableTwin` shall call its callback with an `Error` if an error is received while unsubscribing.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_065: [`disableTwinDesiredPropertiesUpdates` shall call its callback with an `Error` if an error is received while unsubscribing.]*/
it('calls its callback with an error if it fails to unsubscribe', function (testCallback) {
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
fakeMqttBase.unsubscribe = sinon.stub().callsArgWith(1, new Error('fake error'));
@ -764,7 +763,7 @@ describe('Mqtt', function () {
transport[testConfig.disableFeatureMethod](function (err) {
/*Tests_SRS_NODE_DEVICE_MQTT_16_042: [`disableC2D` shall unsubscribe from the topic for C2D messages.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_045: [`disableMethods` shall unsubscribe from the topic for direct methods.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_063: [`disableTwin` shall unsubscribe from the topics for twin messages.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_063: [`disableTwinDesiredPropertiesUpdates` shall unsubscribe from the topics for twin messages.]*/
assert.isTrue(fakeMqttBase.unsubscribe.called);
assert.instanceOf(err, Error);
testCallback();
@ -775,7 +774,7 @@ describe('Mqtt', function () {
/*Tests_SRS_NODE_DEVICE_MQTT_16_054: [`disableC2D` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_055: [`disableMethods` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_064: [`disableTwin` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_064: [`disableTwinDesiredPropertiesUpdates` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/
it('unsubscribes and calls its callback', function (testCallback) {
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
transport.connect(function () {
@ -783,7 +782,7 @@ describe('Mqtt', function () {
transport[testConfig.disableFeatureMethod](function (err) {
/*Tests_SRS_NODE_DEVICE_MQTT_16_042: [`disableC2D` shall unsubscribe from the topic for C2D messages.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_045: [`disableMethods` shall unsubscribe from the topic for direct methods.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_063: [`disableTwin` shall unsubscribe from the topics for twin messages.]*/
/*Tests_SRS_NODE_DEVICE_MQTT_16_063: [`disableTwinDesiredPropertiesUpdates` shall unsubscribe from the topics for twin messages.]*/
assert.isTrue(fakeMqttBase.unsubscribe.called);
assert.isUndefined(err);
testCallback();
@ -793,4 +792,87 @@ describe('Mqtt', function () {
});
});
});
describe('#getTwin', function () {
/*Tests_SRS_NODE_DEVICE_MQTT_16_075: [`getTwin` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/
it('connects the transport if disconnected', function () {
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
transport.getTwin(function () {});
assert.isTrue(fakeMqttBase.connect.calledOnce);
});
/*Tests_SRS_NODE_DEVICE_MQTT_16_076: [`getTwin` shall call its callback with an error if it fails to connect the transport]*/
it('calls the callback with an error if the transport fails to connect', function (testCallback) {
fakeMqttBase.connect = sinon.stub().callsArgWith(1, new Error('fake'));
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
transport.getTwin(function (err) {
assert.instanceOf(err, Error);
testCallback();
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_16_077: [`getTwin` shall call the `getTwin` method on the `MqttTwinClient` object and pass it its callback.]*/
it('calls getTwin on the MqttTwinClient object and passes its callback', function (testCallback) {
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
var fakeCallback = function () {};
sinon.spy(transport._twinClient, 'getTwin');
transport.connect(function () {
transport.getTwin(fakeCallback);
assert.isTrue(transport._twinClient.getTwin.calledOnce);
assert.isTrue(transport._twinClient.getTwin.calledWith(fakeCallback));
testCallback();
});
});
});
describe('#updateTwinReportedProperties', function () {
var fakePatch = {
fake: 'patch'
};
/*Tests_SRS_NODE_DEVICE_MQTT_16_078: [`updateTwinReportedProperties` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/
it('connects the transport if disconnected', function () {
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
transport.updateTwinReportedProperties(fakePatch, function () {});
assert.isTrue(fakeMqttBase.connect.calledOnce);
});
/*Tests_SRS_NODE_DEVICE_MQTT_16_079: [`updateTwinReportedProperties` shall call its callback with an error if it fails to connect the transport.]*/
it('calls the callback with an error if the transport fails to connect', function (testCallback) {
fakeMqttBase.connect = sinon.stub().callsArgWith(1, new Error('fake'));
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
transport.updateTwinReportedProperties(fakePatch, function (err) {
assert.instanceOf(err, Error);
testCallback();
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_16_080: [`updateTwinReportedProperties` shall call the `updateTwinReportedProperties` method on the `MqttTwinClient` object and pass it its callback.]*/
it('calls updateTwinReportedProperties on the MqttTwinClient object and passes its callback', function (testCallback) {
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
var fakeCallback = function () {};
sinon.spy(transport._twinClient, 'updateTwinReportedProperties');
transport.connect(function () {
transport.updateTwinReportedProperties(fakePatch, fakeCallback);
assert.isTrue(transport._twinClient.updateTwinReportedProperties.calledOnce);
assert.isTrue(transport._twinClient.updateTwinReportedProperties.calledWith(fakePatch, fakeCallback));
testCallback();
});
});
});
describe('#on(\'twinDesiredPropertiesUpdate\'', function () {
/*Tests_SRS_NODE_DEVICE_MQTT_16_081: [The `Mqtt` constructor shall subscribe to the `MqttTwinClient` `twinDesiredPropertiesUpdates` and reemit them.]*/
it('reemits events emitted by the twin client', function (testCallback) {
var fakePatch = {
fake: 'patch'
};
var transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
transport.on('twinDesiredPropertiesUpdate', function (patch) {
assert.strictEqual(patch, fakePatch);
testCallback();
});
transport._twinClient.emit('twinDesiredPropertiesUpdate', fakePatch);
});
});
});

Просмотреть файл

@ -0,0 +1,444 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
var assert = require('chai').assert;
var EventEmitter = require('events').EventEmitter;
var MqttTwinClient = require('../lib/mqtt_twin_client.js').MqttTwinClient;
var MqttProvider = require('../../../../common/transport/mqtt/test/_fake_mqtt.js');
var sinon = require('sinon');
var provider;
var receiver;
var validateSubscription = function(shortname, topic, done) {
receiver.on(shortname, function() {});
process.nextTick(function() {
assert(provider.subscribe.withArgs(topic).calledOnce);
done();
});
};
var validateUnsubscription = function(shortname, topic, done) {
var func1 = function() { };
var func2 = function() { };
receiver.on(shortname, func1);
receiver.on(shortname, func2);
process.nextTick(function() {
receiver.removeListener(shortname, func1);
assert(provider.unsubscribe.withArgs(topic).notCalled);
receiver.removeListener(shortname, func2);
process.nextTick(function() {
assert(provider.unsubscribe.withArgs(topic).calledOnce);
done();
});
});
};
var validateEventFires = function(shortname, topic, done) {
var handler = sinon.spy();
receiver.on(shortname, handler);
provider.fakeMessageFromService(topic, 'fake_body');
process.nextTick(function() {
assert(handler.calledOnce);
provider.fakeMessageFromService(topic, 'fake_body');
process.nextTick(function() {
assert(handler.calledTwice);
done();
});
});
};
describe('MqttTwinClient', function () {
beforeEach(function(done) {
provider = new MqttProvider();
receiver = new MqttTwinClient(provider);
done();
});
describe('#constructor', function () {
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_001: [The `MqttTwinClient` constructor shall accept a `client` object]*/
it ('accepts a config object', function() {
assert.equal(receiver._mqtt, provider);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_001: [The `MqttTwinClient` constructor shall immediately subscribe to the `message` event of the `client` object.]*/
it('subscribes to the message event', function () {
var fakeClient = new EventEmitter();
sinon.spy(fakeClient, 'on');
var twinClient = new MqttTwinClient(fakeClient);
assert.isTrue(fakeClient.on.calledWith('message'));
});
});
describe('#getTwin', function () {
var fakeMqttBase;
beforeEach(function () {
fakeMqttBase = new EventEmitter();
fakeMqttBase.connect = sinon.stub().callsArg(1);
fakeMqttBase.disconnect = sinon.stub().callsArg(0);
fakeMqttBase.publish = sinon.stub().callsArg(3);
fakeMqttBase.subscribe = sinon.stub().callsArg(2);
fakeMqttBase.unsubscribe = sinon.stub().callsArg(1);
fakeMqttBase.updateSharedAccessSignature = sinon.stub().callsArg(1);
});
afterEach(function () {
fakeMqttBase = null;
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_002: [The `getTwin` method shall subscribe to the `$iothub/twin/res/#` topic if not already subscribed.]*/
it('subscribes to the response topic', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
assert.isTrue(fakeMqttBase.subscribe.calledWith('$iothub/twin/res/#'));
});
it('does not subscribe twice to the response topic', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
twinClient.getTwin(function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_009: [If subscribing to the response topic fails, the callback shall be called with the translated version of the error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
it('calls its callback with an error if subscribing fails', function (testCallback) {
var fakeError = new Error('failed to subscribe');
fakeMqttBase.subscribe = sinon.stub().callsArgWith(2, fakeError);
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function (err) {
assert.instanceOf(err, Error);
assert.strictEqual(err.transportError, fakeError);
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
testCallback();
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_003: [The `getTwin` method shall publish the request message on the `$iothub/twin/get/?rid=<requestId>` topic using the `MqttBase.publish` method.]*/
it('publishes the request on the $iothub/twin/get/ topic', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function () {});
assert.isTrue(fakeMqttBase.publish.calledOnce);
assert.strictEqual(fakeMqttBase.publish.firstCall.args[0].indexOf('$iothub/twin/GET/'), 0);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_004: [The `getTwin` method shall publish the request message with QoS=0, DUP=0 and Retain=0.]*/
it('publishes the request with qos = 0 and retail = false', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function () {});
assert.strictEqual(fakeMqttBase.publish.firstCall.args[2].qos, 0);
assert.strictEqual(fakeMqttBase.publish.firstCall.args[2].retain, false);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_005: [The `requestId` property in the topic querystring should be set to a random unique integer that will be used to identify the response later on.]*/
it('publishes the request with an integer that is a requestId', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function () {});
var firstRequestId = fakeMqttBase.publish.firstCall.args[0].split('=')[1];
assert.isNumber(parseInt(firstRequestId));
twinClient.getTwin(function () {});
var secondRequestId = fakeMqttBase.publish.secondCall.args[0].split('=')[1];
assert.isNumber(parseInt(secondRequestId));
assert.notEqual(firstRequestId, secondRequestId);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_006: [The request message published by the `getTwin` method shall have an empty body.]*/
it('publishes the request with an empty body', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function () {});
assert.strictEqual(fakeMqttBase.publish.firstCall.args[1], ' ');
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_007: [When a message is received on the response topic with an `$rid` property in the query string of the topic matching the one that was sent on the request topic, the `callback` shall be called with a `null` error object and the parsed content of the response message.]*/
it('calls its callback with the response when it is received', function (testCallback) {
var fakeTwin = {
fake: 'twin'
};
fakeMqttBase.publish = sinon.stub().callsFake(function (topic, body, options, callback) {
var requestId = parseInt(topic.split('=')[1]);
var fakeResponseTopic = '$iothub/twin/res/200?$rid=' + requestId;
callback();
fakeMqttBase.emit('message', fakeResponseTopic, JSON.stringify(fakeTwin));
});
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function (err, twin) {
assert.isNull(err);
assert.deepEqual(twin, fakeTwin);
testCallback();
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_008: [If an error happen while publishing the request message, the `callback` shall be called with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
it('calls its callback with an error if publish fails', function (testCallback) {
var fakeError = new Error('failed to publish');
fakeMqttBase.publish = sinon.stub().callsArgWith(3, fakeError);
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.getTwin(function (err, twin) {
assert.instanceOf(err, Error);
assert.strictEqual(err.transportError, fakeError);
testCallback();
});
});
});
describe('#updateTwinReportedProperties', function () {
var fakeMqttBase;
beforeEach(function () {
fakeMqttBase = new EventEmitter();
fakeMqttBase.connect = sinon.stub().callsArg(1);
fakeMqttBase.disconnect = sinon.stub().callsArg(0);
fakeMqttBase.publish = sinon.stub().callsArg(3);
fakeMqttBase.subscribe = sinon.stub().callsArg(2);
fakeMqttBase.unsubscribe = sinon.stub().callsArg(1);
fakeMqttBase.updateSharedAccessSignature = sinon.stub().callsArg(1);
});
afterEach(function () {
fakeMqttBase = null;
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_011: [The `updateTwinReportedProperties` method shall subscribe to the `$iothub/twin/res/#` topic if not already subscribed.]*/
it('subscribes to the response topic', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
assert.isTrue(fakeMqttBase.subscribe.calledWith('$iothub/twin/res/#'));
});
it('does not subscribe twice to the response topic', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_012: [If subscribing to the response topic fails, the callback shall be called with the translated version of the error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
it('calls its callback with an error if subscribing fails', function (testCallback) {
var fakeError = new Error('failed to subscribe');
fakeMqttBase.subscribe = sinon.stub().callsArgWith(2, fakeError);
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function (err) {
assert.instanceOf(err, Error);
assert.strictEqual(err.transportError, fakeError);
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
testCallback();
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_013: [The `updateTwinReportedProperties` method shall publish the request message on the `$iothub/twin/patch/properties/reported/?rid=<requestId>` topic using the `MqttBase.publish` method.]*/
it('publishes the request on the $iothub/twin/patch/properties/reported topic', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function () {});
assert.isTrue(fakeMqttBase.publish.calledOnce);
assert.strictEqual(fakeMqttBase.publish.firstCall.args[0].indexOf('$iothub/twin/PATCH/properties/reported/'), 0);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_014: [The `updateTwinReportedProperties` method shall publish the request message with QoS=0, DUP=0 and Retain=0.]*/
it('publishes the request with qos = 0 and retail = false', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function () {});
assert.strictEqual(fakeMqttBase.publish.firstCall.args[2].qos, 0);
assert.strictEqual(fakeMqttBase.publish.firstCall.args[2].retain, false);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_015: [The `requestId` property in the topic querystring should be set to a random unique integer that will be used to identify the response later on.]*/
it('publishes the request with an integer that is a requestId', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function () {});
var firstRequestId = fakeMqttBase.publish.firstCall.args[0].split('=')[1];
assert.isNumber(parseInt(firstRequestId));
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function () {});
var secondRequestId = fakeMqttBase.publish.secondCall.args[0].split('=')[1];
assert.isNumber(parseInt(secondRequestId));
assert.notEqual(firstRequestId, secondRequestId);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_016: [The body of the request message published by the `updateTwinReportedProperties` method shall be a JSON string of the reported properties patch.]*/
it('publishes the request with a body that is the stringified version of the patch', function () {
var fakePatch = { fake: 'patch' };
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties(fakePatch, function () {});
assert.strictEqual(fakeMqttBase.publish.firstCall.args[1], JSON.stringify(fakePatch));
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_017: [When a message is received on the response topic with an `$rid` property in the query string of the topic matching the one that was sent on the request topic, the `callback` shall be called with a `null` error object.]*/
it('calls its callback with the response when it is received', function (testCallback) {
var fakeTwin = {
fake: 'twin'
};
fakeMqttBase.publish = sinon.stub().callsFake(function (topic, body, options, callback) {
var requestId = parseInt(topic.split('=')[1]);
var fakeResponseTopic = '$iothub/twin/res/200?$rid=' + requestId;
callback();
fakeMqttBase.emit('message', fakeResponseTopic, JSON.stringify(fakeTwin));
});
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function (err, twin) {
assert.isNull(err);
assert.deepEqual(twin, fakeTwin);
testCallback();
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_018: [If an error happen while publishing the request message, the `callback` shall be called with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
it('calls its callback with an error if publish fails', function (testCallback) {
var fakeError = new Error('failed to publish');
fakeMqttBase.publish = sinon.stub().callsArgWith(3, fakeError);
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.updateTwinReportedProperties({ fake: 'patch' }, function (err, twin) {
assert.instanceOf(err, Error);
assert.strictEqual(err.transportError, fakeError);
testCallback();
});
});
});
describe('#enableTwinDesiredPropertiesUpdates', function () {
var fakeMqttBase;
beforeEach(function () {
fakeMqttBase = new EventEmitter();
fakeMqttBase.connect = sinon.stub().callsArg(1);
fakeMqttBase.disconnect = sinon.stub().callsArg(0);
fakeMqttBase.publish = sinon.stub().callsArg(3);
fakeMqttBase.subscribe = sinon.stub().callsArg(2);
fakeMqttBase.unsubscribe = sinon.stub().callsArg(1);
fakeMqttBase.updateSharedAccessSignature = sinon.stub().callsArg(1);
});
afterEach(function () {
fakeMqttBase = null;
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_019: [The `enableTwinDesiredPropertiesUpdates` shall subscribe to the `$iothub/twin/PATCH/properties/desired/#` topic using the `MqttBase.subscribe` method if it hasn't been subscribed to already.]*/
it('subscribes to the desired properties updates topic', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.enableTwinDesiredPropertiesUpdates(function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
assert.isTrue(fakeMqttBase.subscribe.calledWith('$iothub/twin/PATCH/properties/desired/#'));
});
it('subscribes to the desired properties updates topic only once', function () {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.enableTwinDesiredPropertiesUpdates(function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
twinClient.enableTwinDesiredPropertiesUpdates(function () {});
assert.isTrue(fakeMqttBase.subscribe.calledOnce);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_020: [The `enableTwinDesiredPropertiesUpdates` shall call its callback with no arguments if the subscription is successful.]*/
it('calls its callback with no argument if the subscription is successful', function (testCallback) {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.enableTwinDesiredPropertiesUpdates(testCallback);
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_021: [if subscribing fails with an error the `enableTwinDesiredPropertiesUpdates` shall call its callback with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
it('calls its callback with an error if subscribing fails', function (testCallback) {
var fakeError = new Error('fake');
fakeMqttBase.subscribe = sinon.stub().callsArgWith(2, fakeError);
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.enableTwinDesiredPropertiesUpdates(function (err) {
assert.instanceOf(err, Error);
assert.strictEqual(err.transportError, fakeError);
testCallback();
});
});
});
describe('#disableTwinDesiredPropertiesUpdates', function () {
var fakeMqttBase;
beforeEach(function () {
fakeMqttBase = new EventEmitter();
fakeMqttBase.connect = sinon.stub().callsArg(1);
fakeMqttBase.disconnect = sinon.stub().callsArg(0);
fakeMqttBase.publish = sinon.stub().callsArg(3);
fakeMqttBase.subscribe = sinon.stub().callsArg(2);
fakeMqttBase.unsubscribe = sinon.stub().callsArg(1);
fakeMqttBase.updateSharedAccessSignature = sinon.stub().callsArg(1);
});
afterEach(function () {
fakeMqttBase = null;
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_022: [The `disableTwinDesiredPropertiesUpdates` shall unsubscribe from the `$iothub/twin/PATCH/properties/desired/#` topic using the `MqttBase.unsubscribe` method if it hasn't been unsubscribed from already.]*/
it('unsubscribes from the desired properties update topic', function (testCallback) {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.enableTwinDesiredPropertiesUpdates(function () {
twinClient.disableTwinDesiredPropertiesUpdates(function() {});
assert.isTrue(fakeMqttBase.unsubscribe.calledOnce);
assert.isTrue(fakeMqttBase.unsubscribe.calledWith('$iothub/twin/PATCH/properties/desired/#'));
testCallback();
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_023: [The `disableTwinDesiredPropertiesUpdates` shall call its callback with no arguments if the unsubscription is successful.]*/
it('calls its callback with no arguments if successful', function (testCallback) {
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.enableTwinDesiredPropertiesUpdates(function () {
twinClient.disableTwinDesiredPropertiesUpdates(testCallback);
});
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_024: [if unsubscribing fails with an error the `disableTwinDesiredPropertiesUpdates` shall call its callback with the translated version of this error obtained by using the `translateError` method of the `azure-iot-mqtt-base` package.]*/
it('calls its callback with an error if it fails to unsubscribe', function (testCallback) {
var fakeError = new Error('fake');
fakeMqttBase.unsubscribe = sinon.stub().callsArgWith(1, fakeError);
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.enableTwinDesiredPropertiesUpdates(function () {
twinClient.disableTwinDesiredPropertiesUpdates(function (err) {
assert.instanceOf(err, Error);
assert.strictEqual(err.transportError, fakeError);
testCallback();
});
});
});
});
describe('#on(\'twinDesiredPropertiesUpdate\'', function () {
var fakeMqttBase;
beforeEach(function () {
fakeMqttBase = new EventEmitter();
fakeMqttBase.connect = sinon.stub().callsArg(1);
fakeMqttBase.disconnect = sinon.stub().callsArg(0);
fakeMqttBase.publish = sinon.stub().callsArg(3);
fakeMqttBase.subscribe = sinon.stub().callsArg(2);
fakeMqttBase.unsubscribe = sinon.stub().callsArg(1);
fakeMqttBase.updateSharedAccessSignature = sinon.stub().callsArg(1);
});
afterEach(function () {
fakeMqttBase = null;
});
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_025: [Once the desired properties update topic has been subscribed to the `MqttTwinClient` shall emit a `twinDesiredPropertiesUpdate` event for messages received on that topic.]*/
it('emits twinDesiredPropertiesUpdate events', function (testCallback) {
var fakePatch = {
fake: 'patch'
};
var twinClient = new MqttTwinClient(fakeMqttBase);
twinClient.on('twinDesiredPropertiesUpdate', function (patch) {
/*Tests_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_026: [The argument of the `twinDesiredPropertiesUpdate` event emitted shall be the object parsed from the JSON string contained in the received message.]*/
assert.deepEqual(patch, fakePatch);
testCallback();
});
fakeMqttBase.emit('message', '$iothub/twin/PATCH/properties/desired/', JSON.stringify(fakePatch));
});
});
});

Просмотреть файл

@ -1,297 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
var assert = require('chai').assert;
var MqttTwinReceiver = require('../lib/mqtt_twin_receiver.js').MqttTwinReceiver;
var MqttProvider = require('../../../../common/transport/mqtt/test/_fake_mqtt.js');
var sinon = require('sinon');
var provider;
var receiver;
var validateSubscription = function(shortname, topic, done) {
receiver.on(shortname, function() {});
process.nextTick(function() {
assert(provider.subscribe.withArgs(topic).calledOnce);
done();
});
};
var validateUnsubscription = function(shortname, topic, done) {
var func1 = function() { };
var func2 = function() { };
receiver.on(shortname, func1);
receiver.on(shortname, func2);
process.nextTick(function() {
receiver.removeListener(shortname, func1);
assert(provider.unsubscribe.withArgs(topic).notCalled);
receiver.removeListener(shortname, func2);
process.nextTick(function() {
assert(provider.unsubscribe.withArgs(topic).calledOnce);
done();
});
});
};
var validateEventFires = function(shortname, topic, done) {
var handler = sinon.spy();
receiver.on(shortname, handler);
provider.fakeMessageFromService(topic, 'fake_body');
process.nextTick(function() {
assert(handler.calledOnce);
provider.fakeMessageFromService(topic, 'fake_body');
process.nextTick(function() {
assert(handler.calledTwice);
done();
});
});
};
describe('MqttTwinReceiver', function () {
beforeEach(function(done) {
provider = new MqttProvider();
receiver = new MqttTwinReceiver(provider);
done();
});
describe('#constructor', function () {
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_001: [** The `MqttTwinReceiver` constructor shall accept a `client` object **]** */
it ('accepts a config object', function() {
assert.equal(receiver._mqtt, provider);
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_002: [** The `MqttTwinReceiver` constructor shall throw `ReferenceError` if the `client` object is falsy **]** */
it ('throws if client is falsy', function() {
assert.throws(function() {
receiver = new MqttTwinReceiver();
}, ReferenceError);
});
});
describe('response event', function() {
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_003: [** When a listener is added for the `response` event, the appropriate topic shall be asynchronously subscribed to. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_009: [** The subscribed topic for `response` events shall be '$iothub/twin/res/#' **]** */
it ('asynchronously subscribes when a listener is added', function(done) {
validateSubscription(MqttTwinReceiver.responseEvent, '$iothub/twin/res/#', done);
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_005: [** When there are no more listeners for the `response` event, the topic should be unsubscribed **]** */
it ('unsubscribes when there are no more listeners', function(done) {
validateUnsubscription(MqttTwinReceiver.responseEvent, '$iothub/twin/res/#', done);
});
it('emits an error if unsubscribing fails with an error', function (testCallback) {
var fakeError = new Error('fake');
provider.unsubscribe = sinon.stub().callsFake(function (topic, callback) {
callback(fakeError);
});
var errCallback = function (err) {
assert.strictEqual(err, fakeError);
receiver.removeListener('error', errCallback);
testCallback();
};
var fakeCallback = function () {};
receiver.on('error', errCallback);
receiver.on(MqttTwinReceiver.responseEvent, fakeCallback);
receiver.removeListener(MqttTwinReceiver.responseEvent, fakeCallback);
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_004: [** If there is a listener for the `response` event, a `response` event shall be emitted for each response received. **]** */
it ('emits a response event for every message', function(done) {
validateEventFires(MqttTwinReceiver.responseEvent, '$iothub/twin/res/200/?$rid=42', done);
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_006: [** When a `response` event is emitted, the parameter shall be an object which contains `status`, `requestId` and `body` members **]** */
it ('has a parameter with an object with the correct properties', function(done) {
receiver.on(MqttTwinReceiver.responseEvent, function(data) {
assert.isNumber(data.status);
assert.isDefined(data.$rid);
assert.isDefined(data.body);
done();
});
provider.fakeMessageFromService('$iothub/twin/res/200/?$rid=42', 'fake_body');
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_010: [** The topic which receives the response shall be formatted as '$iothub/twin/res/{status}/?$rid={request id}' **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_011: [** The `status` parameter of the `response` event shall be parsed out of the topic name from the {status} field **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_012: [** The `requestId` parameter of the `response` event shall be parsed out of the topic name from the {request id} field **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_013: [** The `body` parameter of the `response` event shall be the body of the received MQTT message **]** */
it ('parses the topic name correctly', function(done) {
receiver.on(MqttTwinReceiver.responseEvent, function(data) {
assert.strictEqual(data.status, 200);
assert.strictEqual(data.$rid, '42');
assert.strictEqual(data.body, 'fake_body');
done();
});
provider.fakeMessageFromService('$iothub/twin/res/200/?$rid=42', 'fake_body');
});
it ('ignores messages on invalid topics', function(done) {
receiver.on(MqttTwinReceiver.responseEvent, function() {
assert.fail();
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_014: [** Any messages received on topics which violate the topic name formatting shall be ignored. **]** */
provider.fakeMessageFromService('garbage');
provider.fakeMessageFromService('$iothub/twin/PATCH/properties/desired/');
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_015: [** the {status} and {request id} fields in the topic name are required. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_016: [** The {status} and {request id} fields in the topic name shall be strings **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_017: [** The {status} and {request id} fields in the topic name cannot be zero length. **]** */
provider.fakeMessageFromService('$iothub/twin/res/');
provider.fakeMessageFromService('$iothub/twin/res/200/$rid=');
provider.fakeMessageFromService('$iothub/twin/res//$rid=10');
provider.fakeMessageFromService('$iothub/twin/res//$rid=');
process.nextTick(done);
});
});
describe('post event', function() {
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_018: [** When a listener is added to the post event, the appropriate topic shall be asynchronously subscribed to. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_019: [** The subscribed topic for post events shall be $iothub/twin/PATCH/properties/reported/# **]** */
it ('asynchronously subscribes when a listener is added', function(done) {
validateSubscription(MqttTwinReceiver.postEvent, '$iothub/twin/PATCH/properties/desired/#', done);
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_021: [** When there are no more listeners for the post event, the topic should be unsubscribed. **]** */
it ('unsubscribes when there are no more listeners', function(done) {
validateUnsubscription(MqttTwinReceiver.postEvent, '$iothub/twin/PATCH/properties/desired/#', done);
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_020: [** If there is a listener for the post event, a post event shal be emitteded for each post message received **]** */
it ('emits a response for every message', function(done) {
validateEventFires(MqttTwinReceiver.postEvent, '$iothub/twin/PATCH/properties/desired/#', done);
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_022: [** When a post event it emitted, the parameter shall be the body of the message **]** */
it ('passes the body as a parameter of the event', function(done) {
receiver.on(MqttTwinReceiver.postEvent, function(data) {
assert.equal(data, 'fake_body');
done();
});
provider.fakeMessageFromService('$iothub/twin/PATCH/properties/desired/', 'fake_body');
});
});
describe('error event', function() {
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_023: [** If the `error` event is subscribed to, an `error` event shall be emitted if any asynchronous subscribing operations fails. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_024: [** When the `error` event is emitted, the first parameter shall be an error object obtained via the MQTT `translateErrror` module. **]** */
it ('emits an error if subscribing to response event fails', function(done) {
provider.subscribeShouldFail = true;
receiver.on('error', function(err) {
if (err.constructor.name === 'UnauthorizedError') {
done();
}
});
receiver.on(MqttTwinReceiver.responseEvent, function() {});
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_023: [** If the `error` event is subscribed to, an `error` event shall be emitted if any asynchronous subscribing operations fails. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_024: [** When the `error` event is emitted, the first parameter shall be an error object obtained via the MQTT `translateErrror` module. **]** */
it ('emits an error if subscribing to post event fails', function(done) {
provider.subscribeShouldFail = true;
receiver.on('error', function(err) {
if (err.constructor.name === 'UnauthorizedError') {
done();
}
});
receiver.on(MqttTwinReceiver.postEvent, function() {});
});
});
describe('subscribed event', function() {
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_025: [** If the `subscribed` event is subscribed to, a `subscribed` event shall be emitted after an MQTT topic is subscribed to. **]** */
it ('emits a subscribed event after successful subscription to response event', function(done) {
var evtCallback = function() {
receiver.removeListener('subscribed', evtCallback);
done();
};
receiver.on('subscribed', evtCallback);
receiver.on(MqttTwinReceiver.responseEvent, function() {});
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_025: [** If the `subscribed` event is subscribed to, a `subscribed` event shall be emitted after an MQTT topic is subscribed to. **]** */
it ('emits a subscribed event after successful subscription to post event', function(done) {
var evtCallback = function() {
receiver.removeListener('subscribed', evtCallback);
done();
};
receiver.on('subscribed', evtCallback);
receiver.on(MqttTwinReceiver.postEvent, function() {});
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_028: [** When the `subscribed` event is emitted, the parameter shall be an object which contains an `eventName` field and an `transportObject` field. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_026: [** When the `subscribed` event is emitted because the response MQTT topic was subscribed, the `eventName` field shall be the string 'response' **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_029: [** When the subscribed event is emitted, the `transportObject` field shall contain the object returned by the library in the subscription response. **]** */
it ('passes the the word "response" as an event parameter when the response topic is successfully subscribed to', function(done) {
receiver.on('subscribed', function(response) {
if ((response.eventName === MqttTwinReceiver.responseEvent) && (response.transportObject === 'fake_object')) {
done();
}
});
receiver.on(MqttTwinReceiver.responseEvent, function() {});
});
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_028: [** When the `subscribed` event is emitted, the parameter shall be an object which contains an `eventName` field and an `transportObject` field. **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_027: [** When the `subscribed` event is emitted because the post MQTT topic was subscribed, the `eventName` field shall be the string 'post' **]** */
/* Tests_SRS_NODE_DEVICE_MQTT_TWIN_RECEIVER_18_029: [** When the subscribed event is emitted, the `transportObject` field shall contain the object returned by the library in the subscription response. **]** */
it ('passes the the word "post" as an event parameter when the post topic is successfully subscribed to', function(done) {
receiver.on('subscribed', function(response) {
if ((response.eventName === MqttTwinReceiver.postEvent) && (response.transportObject === 'fake_object')) {
done();
}
});
receiver.on(MqttTwinReceiver.postEvent, function() {});
});
});
describe('#subscribe', function () {
it('subscribes to the response and post events', function (testCallback) {
receiver.subscribe(function () {
assert.isTrue(provider.subscribe.calledTwice);
assert(provider.subscribe.calledWith('$iothub/twin/res/#'));
assert(provider.subscribe.calledWith('$iothub/twin/PATCH/properties/desired/#'));
testCallback();
});
});
it('should call its callback immediately if already subscribed', function (testCallback) {
receiver.subscribe(function () {
assert.isTrue(provider.subscribe.calledTwice);
receiver.subscribe(function () {
assert.isTrue(provider.subscribe.calledTwice); // still the same number of calls - we didn't try to resubscribe
testCallback();
});
});
});
});
describe('#unsubscribe', function () {
it('unsubscribes to the response and post events', function (testCallback) {
receiver.subscribe(function () {
receiver.unsubscribe(function () {
assert.isTrue(provider.unsubscribe.calledTwice);
assert(provider.unsubscribe.calledWith('$iothub/twin/res/#'));
assert(provider.unsubscribe.calledWith('$iothub/twin/PATCH/properties/desired/#'));
testCallback();
});
});
});
it('should call its callback immediately if already unsubscribed', function (testCallback) {
receiver.unsubscribe(function () {
assert.isTrue(provider.unsubscribe.notCalled);
testCallback();
});
});
});
});

Просмотреть файл

@ -139,18 +139,18 @@ delete nullMergeResult.tweedle;
it('relies on $version starting at 1 and incrementing by 1 each time', function(done) {
assert.equal(deviceTwin.properties.desired.$version,1);
deviceTwin.on('properties.desired', function() {
if (deviceTwin.properties.desired.$version === 1) {
// ignore $update === 1. assert needed to make jshint happy
assert(true);
} else if (deviceTwin.properties.desired.$version === 2) {
done();
} else {
done(new Error('incorrect property version received - ' + deviceTwin.properties.desired.$version));
}
});
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
if (err) return done(err);
deviceTwin.on('properties.desired', function() {
if (deviceTwin.properties.desired.$version === 1) {
// ignore $update === 1. assert needed to make jshint happy
assert(true);
} else if (deviceTwin.properties.desired.$version === 2) {
done();
} else {
done(new Error('incorrect property version received - ' + deviceTwin.properties.desired.$version));
}
});
});
});
@ -190,27 +190,38 @@ delete nullMergeResult.tweedle;
var sendsAndReceivesDesiredProperties = function(done) {
assertObjectIsEmpty(deviceTwin.properties.desired);
deviceTwin.on('properties.desired', function(props) {
if (props.$version === 1) {
// ignore
assert(true);
} else if (props.$version === 2) {
assertObjectsAreEqual(newProps, deviceTwin.properties.desired);
done();
} else {
done(new Error('incorrect property version received - ' + props.$version));
}
});
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
if (err) return done(err);
deviceTwin.on('properties.desired', function(props) {
if (props.$version === 1) {
// ignore
assert(true);
} else if (props.$version === 2) {
assertObjectsAreEqual(newProps, deviceTwin.properties.desired);
done();
} else {
done(new Error('incorrect property version received - ' + props.$version));
}
});
});
};
it('sends and receives desired properties', sendsAndReceivesDesiredProperties);
var mergeDesiredProperties = function(first, second, newEtag, result, done) {
deviceTwin.on('properties.desired', function(props) {
if (props.$version === 1 || props.$version === 2) {
// ignore
assert(true);
} else if (props.$version === 3) {
assertObjectsAreEqual(deviceTwin.properties.desired, result);
done();
} else {
done(new Error('incorrect property version received - ' + props.$version));
}
});
serviceTwin.update( { properties : { desired : first } }, function(err) {
if (err) return done(err);
@ -222,18 +233,6 @@ delete nullMergeResult.tweedle;
serviceTwin.update( { properties : { desired : second } }, function(err) {
if (err) return done(err);
deviceTwin.on('properties.desired', function(props) {
if (props.$verion === 1 || props.$version === 2) {
// ignore
assert(true);
} else if (props.$version === 3) {
assertObjectsAreEqual(deviceTwin.properties.desired, result);
done();
} else {
done(new Error('incorrect property version received - ' + props.$version));
}
});
});
});
};