Merge changes from the MQTT transport and device client twin refactoring
This commit is contained in:
Коммит
238dee83ce
|
@ -248,6 +248,8 @@ interface DeviceMethodEventHandler {
|
|||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_086: [** Any operation (such as `sendEvent` or `onDeviceMethod`) happening after a `setRetryPolicy` call should use the policy set during that call. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_096: [** The `setRetryPolicy` method shall call the `setRetryPolicy` method on the twin if it is set and pass it the `policy` object. **]**
|
||||
|
||||
### Events
|
||||
#### message
|
||||
|
||||
|
@ -270,3 +272,15 @@ interface DeviceMethodEventHandler {
|
|||
#### disconnect
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_019: [** The `disconnect` event shall be emitted when the client is disconnected from the server. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_097: [** If the transport emits a `disconnect` event the client while subscribed to C2D messages the retry policy shall try to re-enable the C2D functionality using the transport `enableC2D` method. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_102: [** If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_098: [** If the transport emits a `disconnect` event the client while subscribed to direct methods the retry policy shall try to re-enable the direct methods functionality using the transport `enableMethods` method. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_100: [** If the retry policy fails to reestablish the direct methods functionality a `disconnect` event shall be emitted with a `results.Disconnected` object. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_099: [** If the transport emits a `disconnect` event the client while subscribed to desired properties updates the retry policy shall try to re-enable the twin desired properties updates using the transport `enableTwinDesiredPropertiesUpdates` method. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_101: [** If the retry policy fails to reestablish the twin desired properties updates functionality a `disconnect` event shall be emitted with a `results.Disconnected` object. **]**
|
|
@ -7,7 +7,7 @@ azure-iot-device.Twin provides access to the Device Twin functionaliy of IoTHub.
|
|||
|
||||
```typescript
|
||||
class Twin extends EventEmitter {
|
||||
constructor(transport: Client.Transport);
|
||||
constructor(transport: Client.Transport, retryPolicy: RetryPolicy, maxOperationTimeout: number);
|
||||
get(callback: (err: Error, twin?: Twin) => void): void;
|
||||
properties: {
|
||||
reported: {
|
||||
|
@ -62,6 +62,16 @@ class Twin extends EventEmitter {
|
|||
|
||||
**SRS_NODE_DEVICE_TWIN_16_013: [** Recursively for each desired property that is part of the patch received, an event named using the convention `properties.desired[.path]` shall be fired with an argument containing the value of the property. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_TWIN_18_045: [** If a property is already set when a handler is added for that property, the `Twin` object shall fire a property changed event for the property. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_TWIN_16_012: [** When a `twinDesiredPropertiesUpdates` event is emitted by the transport, the property patch passed as argument to the event handler shall be merged with the current desired properties. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_TWIN_16_013: [** Recursively for each desired property that is part of the patch received, an event named using the convention `properties.desired[.path]` shall be fired with an argument containing the value of the property. **]**
|
||||
|
||||
### setRetryPolicy(retryPolicy: RetryPolicy): void
|
||||
|
||||
**SRS_NODE_DEVICE_TWIN_16_014: [** the `retryPolicy` object passed to the `setRetryPolicy` method shall be used to retry any subsequent operation. **]**
|
||||
|
||||
|
||||
## Implementation notes
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
"alltest": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec \"test/**/_*_test*.js\"",
|
||||
"ci": "npm -s run lint && npm -s run build && npm -s run alltest-min && npm -s run check-cover",
|
||||
"test": "npm -s run lint && npm -s run build && npm -s run unittest",
|
||||
"check-cover": "istanbul check-coverage --statements 95 --branches 85 --lines 96 --functions 91"
|
||||
"check-cover": "istanbul check-coverage --statements 96 --branches 88 --lines 98 --functions 93"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">= 0.10"
|
||||
|
|
|
@ -125,21 +125,36 @@ export class Client extends EventEmitter {
|
|||
this._disconnectHandler = (err) => {
|
||||
debug('transport disconnect event: ' + (err ? err.toString() : 'no error'));
|
||||
if (err && this._retryPolicy.shouldRetry(err)) {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event the client while subscribed to C2D messages the retry policy shall try to re-enable the C2D functionality using the transport `enableC2D` method.]*/
|
||||
if (this._c2dEnabled) {
|
||||
this._c2dEnabled = false;
|
||||
debug('re-enabling C2D link');
|
||||
this._enableC2D((err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
this.emit('disconnect', new results.Disconnected(err));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_098: [If the transport emits a `disconnect` event the client while subscribed to direct methods the retry policy shall try to re-enable the direct methods functionality using the transport `enableMethods` method.]*/
|
||||
if (this._methodsEnabled) {
|
||||
this._methodsEnabled = false;
|
||||
debug('re-enabling Methods link');
|
||||
this._enableMethods((err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_100: [If the retry policy fails to reestablish the direct methods functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
this.emit('disconnect', new results.Disconnected(err));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_099: [If the transport emits a `disconnect` event the client while subscribed to desired properties updates the retry policy shall try to re-enable the twin desired properties updates using the transport `enableTwinDesiredPropertiesUpdates` method.]*/
|
||||
if (this._twin && this._twin.desiredPropertiesUpdatesEnabled) {
|
||||
debug('re-enabling Twin');
|
||||
this._twin.enableTwinDesiredPropertiesUpdates((err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_101: [If the retry policy fails to reestablish the twin desired properties updates functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
this.emit('disconnect', new results.Disconnected(err));
|
||||
}
|
||||
});
|
||||
|
@ -445,7 +460,7 @@ export class Client extends EventEmitter {
|
|||
getTwin(done: (err?: Error, twin?: Twin) => void, twin?: Twin): void {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_094: [If this is the first call to `getTwin` the method shall instantiate a new `Twin` object and pass it the transport currently in use.]*/
|
||||
if (!this._twin) {
|
||||
this._twin = new Twin(this._transport);
|
||||
this._twin = new Twin(this._transport, this._retryPolicy, this._maxOperationTimeout);
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_095: [The `getTwin` method shall call the `get()` method on the `Twin` object currently in use and pass it its `done` argument for a callback.]*/
|
||||
|
@ -470,6 +485,11 @@ export class Client extends EventEmitter {
|
|||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_086: [Any operation happening after a `setRetryPolicy` call should use the policy set during that call.]*/
|
||||
this._retryPolicy = policy;
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_096: [The `setRetryPolicy` method shall call the `setRetryPolicy` method on the twin if it is set and pass it the `policy` object.]*/
|
||||
if (this._twin) {
|
||||
this._twin.setRetryPolicy(policy);
|
||||
}
|
||||
}
|
||||
|
||||
private _validateDeviceMethodInputs(methodName: string, callback: (request: DeviceMethodRequest, response: DeviceMethodResponse) => void): void {
|
||||
|
|
|
@ -8,6 +8,7 @@ import * as traverse from 'traverse';
|
|||
import * as dbg from 'debug';
|
||||
const debug = dbg('azure-iot-device:Twin');
|
||||
|
||||
import { RetryPolicy, RetryOperation } from 'azure-iot-common';
|
||||
import { Client } from './client';
|
||||
|
||||
/**
|
||||
|
@ -39,7 +40,10 @@ export class Twin extends EventEmitter {
|
|||
* The desired and reported properties dictionnaries (respectively in `properties.desired` and `properties.reported`).
|
||||
*/
|
||||
properties: TwinProperties;
|
||||
desiredPropertiesUpdatesEnabled: boolean;
|
||||
private _transport: Client.Transport;
|
||||
private _retryPolicy: RetryPolicy;
|
||||
private _maxOperationTimeout: number;
|
||||
|
||||
/**
|
||||
* The constructor should not be used directly and instead the SDK user should use the {@link Client#getTwin} method to obtain a valid `Twin` object.
|
||||
|
@ -47,9 +51,12 @@ export class Twin extends EventEmitter {
|
|||
* @private
|
||||
* @param client The device client to use in order to communicate with the Azure IoT hub.
|
||||
*/
|
||||
constructor(transport: Client.Transport) {
|
||||
constructor(transport: Client.Transport, retryPolicy: RetryPolicy, maxTimeout: number) {
|
||||
super();
|
||||
this._transport = transport;
|
||||
this._retryPolicy = retryPolicy;
|
||||
this._maxOperationTimeout = maxTimeout;
|
||||
this.desiredPropertiesUpdatesEnabled = false;
|
||||
this.on('newListener', this._handleNewListener.bind(this));
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_001: [The `Twin` constructor shall subscribe to the `onDesiredPropertiesUpdate` event off the `transport` object.]*/
|
||||
this._transport.on('twinDesiredPropertiesUpdate', this._onDesiredPropertiesUpdate.bind(this));
|
||||
|
@ -61,26 +68,46 @@ export class Twin extends EventEmitter {
|
|||
* @param callback function that shall be called back with either the twin or an error if the transport fails to retrieve the twin.
|
||||
*/
|
||||
get(callback: (err: Error, twin?: Twin) => void): void {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_002: [The `get` method shall call the `getTwin` method of the `Transport` object with a callback.]*/
|
||||
this._transport.getTwin((err, twinProperties) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_003: [If the callback passed to the `getTwin` method is called with an error, the `callback` passed to the call to the `get` method shall be called with that error.]*/
|
||||
callback(err);
|
||||
} else {
|
||||
this._clearCachedProperties();
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_004: [If the callback passed to the `getTwin` method is called with no error and a `TwinProperties` object, these properties shall be merged with the current instance properties.]*/
|
||||
this._mergePatch(this.properties.desired, twinProperties.desired);
|
||||
this._mergePatch(this.properties.reported, twinProperties.reported);
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_006: [For each desired property that is part of the `TwinProperties` object received, an event named after the path to this property shall be fired.]*/
|
||||
this._fireChangeEvents(this.properties.desired);
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_005: [Once the properties have been merged the `callback` method passed to the call to `get` shall be called with a first argument that is `null` and a second argument that is the current `Twin` instance (`this`).]*/
|
||||
callback(null, this);
|
||||
}
|
||||
});
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_002: [The `get` method shall call the `getTwin` method of the `Transport` object with a callback.]*/
|
||||
this._transport.getTwin((err, twinProperties) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_003: [If the callback passed to the `getTwin` method is called with an error, the `callback` passed to the call to the `get` method shall be called with that error.]*/
|
||||
opCallback(err);
|
||||
} else {
|
||||
this._clearCachedProperties();
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_004: [If the callback passed to the `getTwin` method is called with no error and a `TwinProperties` object, these properties shall be merged with the current instance properties.]*/
|
||||
this._mergePatch(this.properties.desired, twinProperties.desired);
|
||||
this._mergePatch(this.properties.reported, twinProperties.reported);
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_006: [For each desired property that is part of the `TwinProperties` object received, an event named after the path to this property shall be fired.]*/
|
||||
this._fireChangeEvents(this.properties.desired);
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_005: [Once the properties have been merged the `callback` method passed to the call to `get` shall be called with a first argument that is `null` and a second argument that is the current `Twin` instance (`this`).]*/
|
||||
opCallback(null, this);
|
||||
}
|
||||
});
|
||||
}, callback);
|
||||
}
|
||||
|
||||
private _enableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void {
|
||||
this._transport.enableTwinDesiredPropertiesUpdates(callback);
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
setRetryPolicy(policy: RetryPolicy): void {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_014: [the `retryPolicy` object passed to the `setRetryPolicy` method shall be used to retry any subsequent operation.]*/
|
||||
this._retryPolicy = policy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
enableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void {
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
this._transport.enableTwinDesiredPropertiesUpdates((err) => {
|
||||
this.desiredPropertiesUpdatesEnabled = !err;
|
||||
opCallback(err);
|
||||
});
|
||||
}, callback);
|
||||
}
|
||||
|
||||
// Note: Since we currently don't keep track of listeners, so we don't "disable" the twin properties updates when no one is listening.
|
||||
|
@ -90,19 +117,22 @@ export class Twin extends EventEmitter {
|
|||
// }
|
||||
|
||||
private _updateReportedProperties(state: any, done: (err?: Error) => void): void {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_007: [The `update` method shall call the `updateReportedProperties` method of the `Transport` object and pass it the patch object and a callback accepting an error as argument.]*/
|
||||
this._transport.updateTwinReportedProperties(state, (err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_008: [If the callback passed to the transport is called with an error, the `callback` argument of the `update` method shall be called with that error.]*/
|
||||
done(err);
|
||||
} else {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_18_031: [If the callback passed to the transport is called with no error, the `properties.reported.update` shall merge the contents of the patch object into `properties.reported`]*/
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_18_032: [When merging the patch, if any properties are set to `null`, `properties.reported.update` shall delete that property from `properties.reported`.]*/
|
||||
this._mergePatch(this.properties.reported, state);
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_009: [Once the properties have been merged the `callback` argument of the `update` method shall be called with no argument.]*/
|
||||
done();
|
||||
}
|
||||
});
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_007: [The `update` method shall call the `updateReportedProperties` method of the `Transport` object and pass it the patch object and a callback accepting an error as argument.]*/
|
||||
this._transport.updateTwinReportedProperties(state, (err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_008: [If the callback passed to the transport is called with an error, the `callback` argument of the `update` method shall be called with that error.]*/
|
||||
opCallback(err);
|
||||
} else {
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_18_031: [If the callback passed to the transport is called with no error, the `properties.reported.update` shall merge the contents of the patch object into `properties.reported`]*/
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_18_032: [When merging the patch, if any properties are set to `null`, `properties.reported.update` shall delete that property from `properties.reported`.]*/
|
||||
this._mergePatch(this.properties.reported, state);
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_009: [Once the properties have been merged the `callback` argument of the `update` method shall be called with no argument.]*/
|
||||
opCallback();
|
||||
}
|
||||
});
|
||||
}, done);
|
||||
}
|
||||
|
||||
/* Codes_SRS_NODE_DEVICE_TWIN_18_031: [** `properties.reported.update` shall merge the contents of the patch object into `properties.reported` **]** */
|
||||
|
@ -165,7 +195,7 @@ export class Twin extends EventEmitter {
|
|||
}
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_010: [When a listener is added for an event name starting with `properties.desired` the `enableTwinDesiredPropertiesUpdates` method of the `Transport` object shall be called with a callback function accepting an optional error argument.]*/
|
||||
this._enableTwinDesiredPropertiesUpdates((err) => {
|
||||
this.enableTwinDesiredPropertiesUpdates((err) => {
|
||||
if (err) {
|
||||
debug('error enabling desired properties updates: ' + err.toString());
|
||||
/*Codes_SRS_NODE_DEVICE_TWIN_16_011: [If the callback passed to the transport is called with an error, that error shall be emitted by the Twin object.]*/
|
||||
|
|
|
@ -912,6 +912,131 @@ describe('Client', function () {
|
|||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_096: [The `setRetryPolicy` method shall call the `setRetryPolicy` method on the twin if it is set and pass it the `policy` object.]*/
|
||||
it('updates the twin retry policy', function (testCallback) {
|
||||
var newPolicy = {
|
||||
shouldRetry: function () {},
|
||||
nextRetryTimeout: function () {}
|
||||
};
|
||||
|
||||
var fakeTransport = new EventEmitter();
|
||||
fakeTransport.getTwin = sinon.stub().callsArgWith(0, null, new Twin(fakeTransport, {}, 0));
|
||||
|
||||
var client = new Client(fakeTransport);
|
||||
client.getTwin(function (err, twin) {
|
||||
sinon.spy(twin, 'setRetryPolicy');
|
||||
client.setRetryPolicy(newPolicy);
|
||||
assert.isTrue(twin.setRetryPolicy.calledOnce);
|
||||
assert.isTrue(twin.setRetryPolicy.calledWith(newPolicy));
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('transport.on(\'disconnect\') handler', function () {
|
||||
var fakeTransport, fakeRetryPolicy;
|
||||
beforeEach(function () {
|
||||
fakeRetryPolicy = {
|
||||
shouldRetry: function () { return true; },
|
||||
nextRetryTimeout: function () { return 1; }
|
||||
};
|
||||
|
||||
fakeTransport = new EventEmitter();
|
||||
fakeTransport.enableC2D = sinon.stub().callsArg(0);
|
||||
fakeTransport.enableTwinDesiredPropertiesUpdates = sinon.stub().callsArg(0);
|
||||
fakeTransport.enableMethods = sinon.stub().callsArg(0);
|
||||
fakeTransport.onDeviceMethod = sinon.stub();
|
||||
fakeTransport.getTwin = sinon.stub().callsArgWith(0, null, new Twin(fakeTransport, fakeRetryPolicy));
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event the client while subscribed to C2D messages the retry policy shall try to re-enable the C2D functionality using the transport `enableC2D` method.]*/
|
||||
it('reenables C2D after being disconnected if C2D was enabled', function () {
|
||||
var client = new Client(fakeTransport);
|
||||
client.setRetryPolicy(fakeRetryPolicy);
|
||||
client.on('message', function () {});
|
||||
assert.isTrue(fakeTransport.enableC2D.calledOnce);
|
||||
fakeTransport.emit('disconnect', new errors.TimeoutError()); // timeouts can be retried
|
||||
assert.isTrue(fakeTransport.enableC2D.calledTwice);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
it('emits a disconnect event if reenabling C2D fails', function (testCallback) {
|
||||
var fakeError = new Error('fake');
|
||||
var client = new Client(fakeTransport);
|
||||
client.on('disconnect', function (err) {
|
||||
assert.instanceOf(err, results.Disconnected);
|
||||
assert.strictEqual(err.transportObj, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
|
||||
client.setRetryPolicy(fakeRetryPolicy);
|
||||
client._maxOperationTimeout = 1;
|
||||
client.on('message', function () {});
|
||||
assert.isTrue(fakeTransport.enableC2D.calledOnce);
|
||||
fakeTransport.enableC2D = sinon.stub().callsArgWith(0, fakeError);
|
||||
fakeTransport.emit('disconnect', new errors.TimeoutError()); // timeouts can be retried
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_099: [If the transport emits a `disconnect` event the client while subscribed to desired properties updates the retry policy shall try to re-enable the twin desired properties updates using the transport `enableTwinDesiredPropertiesUpdates` method.]*/
|
||||
it('reenables device methods after being disconnected if methods were enabled', function () {
|
||||
var client = new Client(fakeTransport);
|
||||
client.setRetryPolicy(fakeRetryPolicy);
|
||||
client.onDeviceMethod('method', function () {});
|
||||
assert.isTrue(fakeTransport.enableMethods.calledOnce);
|
||||
fakeTransport.emit('disconnect', new errors.TimeoutError()); // timeouts can be retried
|
||||
assert.isTrue(fakeTransport.enableMethods.calledTwice);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_100: [If the retry policy fails to reestablish the direct methods functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
it('emits a disconnect event if reenabling methods fails', function (testCallback) {
|
||||
var fakeError = new Error('fake');
|
||||
var client = new Client(fakeTransport);
|
||||
client.on('disconnect', function (err) {
|
||||
assert.instanceOf(err, results.Disconnected);
|
||||
assert.strictEqual(err.transportObj, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
|
||||
client.setRetryPolicy(fakeRetryPolicy);
|
||||
client._maxOperationTimeout = 1;
|
||||
client.onDeviceMethod('method', function () {});
|
||||
assert.isTrue(fakeTransport.enableMethods.calledOnce);
|
||||
fakeTransport.enableMethods = sinon.stub().callsArgWith(0, fakeError);
|
||||
fakeTransport.emit('disconnect', new errors.TimeoutError()); // timeouts can be retried
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_098: [If the transport emits a `disconnect` event the client while subscribed to direct methods the retry policy shall try to re-enable the direct methods functionality using the transport `enableMethods` method.]*/
|
||||
it('reenables device methods after being disconnected if Twin desired properties updates were enabled', function () {
|
||||
var client = new Client(fakeTransport);
|
||||
client.setRetryPolicy(fakeRetryPolicy);
|
||||
client.getTwin(function (err, twin) {
|
||||
twin.on('properties.desired', function () {});
|
||||
assert.isTrue(fakeTransport.enableTwinDesiredPropertiesUpdates.calledOnce);
|
||||
fakeTransport.emit('disconnect', new errors.TimeoutError()); // timeouts can be retried
|
||||
assert.isTrue(fakeTransport.enableTwinDesiredPropertiesUpdates.calledTwice);
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_101: [If the retry policy fails to reestablish the twin desired properties updates functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
it('emits a disconnect event if reenabling twin desired properties updates fails', function (testCallback) {
|
||||
var fakeError = new Error('fake');
|
||||
var client = new Client(fakeTransport);
|
||||
client.on('disconnect', function (err) {
|
||||
assert.instanceOf(err, results.Disconnected);
|
||||
assert.strictEqual(err.transportObj, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
|
||||
client.setRetryPolicy(fakeRetryPolicy);
|
||||
client.getTwin(function (err, twin) {
|
||||
twin.on('properties.desired', function () {});
|
||||
client._twin._maxOperationTimeout = 1;
|
||||
assert.isTrue(fakeTransport.enableTwinDesiredPropertiesUpdates.calledOnce);
|
||||
fakeTransport.enableTwinDesiredPropertiesUpdates = sinon.stub().callsArgWith(0, fakeError);
|
||||
fakeTransport.emit('disconnect', new errors.TimeoutError()); // timeouts can be retried
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ var sinon = require('sinon');
|
|||
var _ = require('lodash');
|
||||
|
||||
describe('Twin', function () {
|
||||
var fakeTransport, fakeTwin;
|
||||
var fakeTransport, fakeTwin, fakeRetryPolicy;
|
||||
|
||||
beforeEach(function () {
|
||||
fakeTwin = {
|
||||
|
@ -24,6 +24,11 @@ describe('Twin', function () {
|
|||
}
|
||||
};
|
||||
|
||||
fakeRetryPolicy = {
|
||||
shouldRetry: function () { return false; },
|
||||
getNextRetryTimeout: function () { return 0; }
|
||||
};
|
||||
|
||||
fakeTransport = new EventEmitter();
|
||||
fakeTransport.getTwin = sinon.stub().callsArgWith(0, null, fakeTwin);
|
||||
fakeTransport.updateTwinReportedProperties = sinon.stub().callsArg(1);
|
||||
|
@ -43,7 +48,7 @@ describe('Twin', function () {
|
|||
describe('#get', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_002: [The `get` method shall call the `getTwin` method of the `Transport` object with a callback.]*/
|
||||
it('calls getTwin on the transpport', function () {
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
twin.get(function () {});
|
||||
assert.isTrue(fakeTransport.getTwin.calledOnce);
|
||||
});
|
||||
|
@ -51,7 +56,7 @@ describe('Twin', function () {
|
|||
/*Tests_SRS_NODE_DEVICE_TWIN_16_004: [If the callback passed to the `getTwin` method is called with no error and a `TwinProperties` object, these properties shall be merged with the current instance properties.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_005: [Once the properties have been merged the `callback` method passed to the call to `get` shall be called with a first argument that is `null` and a second argument that is the current `Twin` instance (`this`).]*/
|
||||
it('calls its callback with the twin after it is merged', function (testCallback) {
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
twin.get(function (err, twin) {
|
||||
assert.deepEqual(twin.properties.desired, fakeTwin.desired);
|
||||
assert.deepEqual(twin.properties.reported.key, fakeTwin.reported.key);
|
||||
|
@ -63,7 +68,7 @@ describe('Twin', function () {
|
|||
it('calls its callback with an error if the transport encounters an error', function (testCallback) {
|
||||
var fakeError = new Error('fake');
|
||||
fakeTransport.getTwin = sinon.stub().callsArgWith(0, fakeError);
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
twin.get(function (err) {
|
||||
assert.strictEqual(err, fakeError);
|
||||
testCallback();
|
||||
|
@ -72,7 +77,7 @@ describe('Twin', function () {
|
|||
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_006: [For each desired property that is part of the `TwinProperties` object received, an event named after the path to this property shall be fired.]*/
|
||||
it('fires events for the new properties that have been merged', function (testCallback) {
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
var genericEventReceived = false;
|
||||
var specificEventReceived = false;
|
||||
twin.on('properties.desired', function (delta) {
|
||||
|
@ -97,7 +102,7 @@ describe('Twin', function () {
|
|||
describe('properties.reported.update', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_007: [The `update` method shall call the `updateReportedProperties` method of the `Transport` object and pass it the patch object and a callback accepting an error as argument.]*/
|
||||
it('calls updateTwinReportedProperties on the transport', function (testCallback) {
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
var fakePatch = { key: 'value' };
|
||||
twin.get(function () {
|
||||
twin.properties.reported.update(fakePatch, function () {});
|
||||
|
@ -112,7 +117,7 @@ describe('Twin', function () {
|
|||
var fakeError = new Error('fake');
|
||||
fakeTransport.updateTwinReportedProperties = sinon.stub().callsArgWith(1, fakeError);
|
||||
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
twin.get(function () {
|
||||
twin.properties.reported.update({ reported: 'fake' }, function (err) {
|
||||
assert.strictEqual(err, fakeError);
|
||||
|
@ -123,10 +128,10 @@ describe('Twin', function () {
|
|||
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_009: [Once the properties have been merged the `callback` argument of the `update` method shall be called with no argument.]*/
|
||||
it('calls its callback with no arguments if the update succeeds', function (testCallback) {
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
twin.get(function () {
|
||||
twin.properties.reported.update({ reported: 'fake' }, function (err) {
|
||||
assert.isUndefined(err);
|
||||
assert.isNull(err);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
@ -134,7 +139,7 @@ describe('Twin', function () {
|
|||
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_18_031: [If the callback passed to the transport is called with no error, the `properties.reported.update` shall merge the contents of the patch object into `properties.reported`]*/
|
||||
it('merges the patch in the reported properties', function (testCallback) {
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
var fakePatch = { key: 'fake' };
|
||||
twin.get(function () {
|
||||
twin.properties.reported.update(fakePatch, function (err) {
|
||||
|
@ -163,7 +168,7 @@ describe('Twin', function () {
|
|||
describe('on(\'properties.desired[.path]\'', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_010: [When a listener is added for an event name starting with `properties.desired` the `enableTwinDesiredPropertiesUpdates` method of the `Transport` object shall be called with a callback function accepting an optional error argument.]*/
|
||||
it('calls enableTwinDesiredPropertiesUpdates on the transport', function () {
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
twin.on('properties.desired', function () {});
|
||||
assert.isTrue(fakeTransport.enableTwinDesiredPropertiesUpdates.calledOnce);
|
||||
});
|
||||
|
@ -172,7 +177,7 @@ describe('Twin', function () {
|
|||
it('emits an error if the call to enableTwinDesiredPropertiesUpdates fails', function (testCallback) {
|
||||
var fakeError = new Error('fake');
|
||||
fakeTransport.enableTwinDesiredPropertiesUpdates = sinon.stub().callsArgWith(0, fakeError);
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
twin.on('error', function (err) {
|
||||
assert.strictEqual(err, fakeError);
|
||||
testCallback();
|
||||
|
@ -180,7 +185,6 @@ describe('Twin', function () {
|
|||
twin.on('properties.desired', function () {});
|
||||
});
|
||||
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_18_045: [If a property is already set when a handler is added for that property, the `Twin` object shall fire a property changed event for the property.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_012: [When a `twinDesiredPropertiesUpdates` event is emitted by the transport, the property patch passed as argument to the event handler shall be merged with the current desired properties.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_013: [Recursively for each desired property that is part of the patch received, an event named using the convention `properties.desired[.path]` shall be fired with an argument containing the value of the property.]*/
|
||||
|
@ -204,7 +208,7 @@ describe('Twin', function () {
|
|||
}
|
||||
}
|
||||
|
||||
var twin = new Twin(fakeTransport);
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy, 0);
|
||||
twin.get(function () {
|
||||
twin.on('properties.desired', function (delta) {
|
||||
assert.deepEqual(delta.prop1, fakePatch.prop1);
|
||||
|
@ -235,4 +239,24 @@ describe('Twin', function () {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('#setRetryPolicy', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_TWIN_16_014: [the `retryPolicy` object passed to the `setRetryPolicy` method shall be used to retry any subsequent operation.]*/
|
||||
it('uses the retry policy passed as an argument in the subsequent calls', function (testCallback) {
|
||||
var testPolicy = {
|
||||
shouldRetry: sinon.stub().returns(false),
|
||||
nextRetryTimeout: sinon.stub().returns(-1)
|
||||
};
|
||||
fakeTransport.getTwin = sinon.stub().callsArgWith(0, new Error('fake'));
|
||||
|
||||
var twin = new Twin(fakeTransport, fakeRetryPolicy);
|
||||
twin.setRetryPolicy(testPolicy);
|
||||
twin.get(function() {
|
||||
assert.isTrue(testPolicy.shouldRetry.calledOnce);
|
||||
assert.isTrue(testPolicy.nextRetryTimeout.notCalled); //shouldRetry being false, nextRetryTimeout should not have been called.
|
||||
assert.isTrue(fakeTransport.getTwin.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -268,7 +268,11 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin.
|
|||
|
||||
**SRS_NODE_DEVICE_MQTT_16_058: [** `enableTwinDesiredPropertiesUpdates` shall calls its callback with an `Error` object if it fails to connect. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_MQTT_16_059: [** `enableTwinDesiredPropertiesUpdates` shall call the `enableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback. **]**
|
||||
**SRS_NODE_DEVICE_MQTT_16_059: [** `enableTwinDesiredPropertiesUpdates` shall subscribe to the MQTT topics for twins. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_MQTT_16_060: [** `enableTwinDesiredPropertiesUpdates` shall call its callback with no arguments when the `SUBACK` packet is received. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_MQTT_16_061: [** `enableTwinDesiredPropertiesUpdates` shall call its callback with an `Error` if subscribing to the topics fails. **]**
|
||||
|
||||
### disableC2D
|
||||
|
||||
|
@ -294,5 +298,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin.
|
|||
|
||||
**SRS_NODE_DEVICE_MQTT_16_062: [** `disableTwinDesiredPropertiesUpdates` shall call its callback immediately if the MQTT connection is already disconnected. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_MQTT_16_083: [** `disableTwinDesiredPropertiesUpdates` shall call the `disableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback. **]**
|
||||
**SRS_NODE_DEVICE_MQTT_16_063: [** `disableTwinDesiredPropertiesUpdates` shall unsubscribe from the topics for twin messages. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_MQTT_16_064: [** `disableTwinDesiredPropertiesUpdates` shall call its callback with no arguments when the `UNSUBACK` packet is received. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_MQTT_16_065: [** `disableTwinDesiredPropertiesUpdates` shall call its callback with an `Error` if an error is received while unsubscribing. **]**
|
||||
|
|
|
@ -149,7 +149,6 @@ protocolAndTermination.forEach( function (testConfiguration) {
|
|||
if (err) return done(err);
|
||||
|
||||
deviceClient = deviceSdk.Client.fromConnectionString(deviceDescription.connectionString, testConfiguration.transport);
|
||||
deviceClient.setRetryPolicy(new NoRetry());
|
||||
|
||||
deviceClient.open(function(err) {
|
||||
if (err) return done(err);
|
||||
|
@ -187,6 +186,7 @@ protocolAndTermination.forEach( function (testConfiguration) {
|
|||
|
||||
doConnectTest(testConfiguration.testEnabled)('Simple twin update: device receives it, and' + testConfiguration.closeReason + 'which is noted by the iot hub client', function(testCallback) {
|
||||
this.timeout(120000);
|
||||
deviceClient.setRetryPolicy(new NoRetry());
|
||||
debug('about to connect a disconnect listener.');
|
||||
deviceClient.on('disconnect', function () {
|
||||
debug('We did get a disconnect message');
|
||||
|
@ -215,7 +215,7 @@ protocolAndTermination.forEach( function (testConfiguration) {
|
|||
});
|
||||
});
|
||||
|
||||
doConnectTest(false)('Simple twin update: device receives it, and' + testConfiguration.closeReason + 'which is NOT noted by the iot hub client', function(testCallback) {
|
||||
doConnectTest(testConfiguration.testEnabled)('Simple twin update: device receives it, and' + testConfiguration.closeReason + 'which is NOT noted by the iot hub client', function(testCallback) {
|
||||
this.timeout(120000);
|
||||
debug('about to connect a disconnect listener.');
|
||||
deviceClient.on('disconnect', function () {
|
||||
|
@ -229,27 +229,27 @@ protocolAndTermination.forEach( function (testConfiguration) {
|
|||
});
|
||||
};
|
||||
assert.equal(deviceTwin.properties.desired.$version,1);
|
||||
deviceTwin.on('properties.desired', function() {
|
||||
if (deviceTwin.properties.desired.$version === 1) {
|
||||
// ignore $update === 1. assert needed to make jshint happy
|
||||
assert(true);
|
||||
} else if (deviceTwin.properties.desired.$version === 2) {
|
||||
var terminateMessage = new Message(' ');
|
||||
terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType);
|
||||
terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason);
|
||||
terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds);
|
||||
deviceClient.sendEvent(terminateMessage, function (sendErr) {
|
||||
debug('at the callback for the fault injection send, err is:' + sendErr);
|
||||
});
|
||||
setTwinMoreNewPropsTimeout = setTimeout(setTwinMoreNewProps, (testConfiguration.delayInSeconds + 5) * 1000);
|
||||
} else if (deviceTwin.properties.desired.$version === 3) {
|
||||
testCallback();
|
||||
} else {
|
||||
testCallback(new Error('incorrect property version received - ' + deviceTwin.properties.desired.$version));
|
||||
}
|
||||
});
|
||||
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
|
||||
if (err) return testCallback(err);
|
||||
deviceTwin.on('properties.desired', function() {
|
||||
if (deviceTwin.properties.desired.$version === 1) {
|
||||
// ignore $update === 1. assert needed to make jshint happy
|
||||
assert(true);
|
||||
} else if (deviceTwin.properties.desired.$version === 2) {
|
||||
var terminateMessage = new Message('');
|
||||
terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType);
|
||||
terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason);
|
||||
terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds);
|
||||
deviceClient.sendEvent(terminateMessage, function (sendErr) {
|
||||
debug('at the callback for the fault injection send, err is:' + sendErr);
|
||||
});
|
||||
setTwinMoreNewPropsTimeout = setTimeout(setTwinMoreNewProps.bind(this), testConfiguration.delayInSeconds + 1000);
|
||||
} else if (deviceTwin.properties.desired.$version === 3) {
|
||||
testCallback();
|
||||
} else {
|
||||
testCallback(new Error('incorrect property version received - ' + deviceTwin.properties.desired.$version));
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
Загрузка…
Ссылка в новой задаче