Modules over AMQP
This commit is contained in:
Родитель
ff08bf1f9a
Коммит
ceec20c38b
|
@ -6,16 +6,20 @@
|
|||
|
||||
export const apiVersion = '2017-11-08-preview';
|
||||
|
||||
export function devicePath(id: string): string {
|
||||
return '/devices/' + id;
|
||||
export function devicePath(deviceId: string, moduleId?: string): string {
|
||||
if (moduleId) {
|
||||
return '/devices/' + deviceId + '/modules/' + moduleId;
|
||||
} else {
|
||||
return '/devices/' + deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
export function eventPath(id: string): string {
|
||||
return devicePath(id) + '/messages/events';
|
||||
export function eventPath(deviceId: string, moduleId?: string): string {
|
||||
return devicePath(deviceId, moduleId) + '/messages/events';
|
||||
}
|
||||
|
||||
export function messagePath(id: string): string {
|
||||
return devicePath(id) + '/messages/devicebound';
|
||||
export function messagePath(deviceId: string, moduleId?: string): string {
|
||||
return devicePath(deviceId, moduleId) + '/messages/devicebound';
|
||||
}
|
||||
|
||||
export function feedbackPath(id: string, lockToken: string): string {
|
||||
|
|
|
@ -8,31 +8,55 @@ var assert = require('chai').assert;
|
|||
var endpoint = require('../lib/endpoint.js');
|
||||
|
||||
describe('endpoint', function () {
|
||||
describe('#devicePath', function () {
|
||||
it('matches /devices/<device-id>', function () {
|
||||
var path = endpoint.devicePath('mydevice');
|
||||
assert.equal('/devices/mydevice', path);
|
||||
[{
|
||||
testName: 'with device only',
|
||||
deviceId: 'mydevice',
|
||||
moduleId: undefined,
|
||||
devicePath: '/devices/mydevice',
|
||||
eventPath: '/devices/mydevice/messages/events',
|
||||
messagePath: '/devices/mydevice/messages/devicebound'
|
||||
},{
|
||||
testName: 'with device and module',
|
||||
deviceId: 'mydevice',
|
||||
moduleId: 'mymodule',
|
||||
devicePath: '/devices/mydevice/modules/mymodule',
|
||||
eventPath: '/devices/mydevice/modules/mymodule/messages/events',
|
||||
messagePath: '/devices/mydevice/modules/mymodule/messages/devicebound'
|
||||
}
|
||||
].forEach(function(testConfig) {
|
||||
describe('#devicePath ' + testConfig.testName, function () {
|
||||
it('matches ' + testConfig.devicePath, function () {
|
||||
var path = endpoint.devicePath(testConfig.deviceId, testConfig.moduleId);
|
||||
assert.strictEqual(testConfig.devicePath, path);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#eventPath ' + testConfig.testName, function () {
|
||||
it('matches ' + testConfig.eventPath, function () {
|
||||
var path = endpoint.eventPath(testConfig.deviceId, testConfig.moduleId);
|
||||
assert.strictEqual(testConfig.eventPath, path);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#messagePath ' + testConfig.testName, function () {
|
||||
it('matches ' + testConfig.messagePath, function () {
|
||||
var path = endpoint.messagePath(testConfig.deviceId, testConfig.moduleId);
|
||||
assert.strictEqual(testConfig.messagePath, path);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('#eventPath', function () {
|
||||
it('matches /devices/<device-id>/messages/events', function () {
|
||||
var path = endpoint.eventPath('mydevice');
|
||||
assert.equal('/devices/mydevice/messages/events', path);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#messagePath', function () {
|
||||
it('matches /devices/<device-id>/messages/devicebound', function () {
|
||||
var path = endpoint.messagePath('mydevice');
|
||||
assert.equal('/devices/mydevice/messages/devicebound', path);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#feedbackPath', function () {
|
||||
it('matches /devices/<device-id>/messages/devicebound/<lockToken>', function () {
|
||||
var path = endpoint.feedbackPath('mydevice', '55E68746-0AD9-4DCF-9906-79CDAC14FFBA');
|
||||
assert.equal('/devices/mydevice/messages/devicebound/55E68746-0AD9-4DCF-9906-79CDAC14FFBA', path);
|
||||
assert.strictEqual('/devices/mydevice/messages/devicebound/55E68746-0AD9-4DCF-9906-79CDAC14FFBA', path);
|
||||
});
|
||||
});
|
||||
|
||||
describe('versionQueryString', function() {
|
||||
it('matches ?api-version=' + endpoint.apiVersion, function () {
|
||||
var apiVer = endpoint.versionQueryString();
|
||||
assert.strictEqual('?api-version=' + endpoint.apiVersion, apiVer);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -17,8 +17,19 @@ The following table shows which clients support which events & messages:
|
|||
|
||||
| client type | `sendEvent` | `sendOutputEvent` | Messages | InputMessages |
|
||||
|------------ | --------- | --------------- | -------- | ------------- |
|
||||
| device | yes | (theoretically yes) | yes | no |
|
||||
| module | yes | yes | no | yes|
|
||||
| device | yes | no | yes | no |
|
||||
| module | no | yes | no | yes|
|
||||
|
||||
## Notes on inputName and outputName encoding
|
||||
| parameter | encoding for MQTT | encoding for AMQP |
|
||||
|-|-|-|
|
||||
| outputName | encoded in outgoing topic name. E.g. "devices/\<deviceId>/modules/\<moduleId>/messages/events/$.on=\<outputName>" | uses the same link as telemetry, outputName is encoded in an annotation named "x-opt-output-name" |
|
||||
| inputName | encoded in incoming topic name E.g. "devices/\<deviceId>/modules/\<moduleId>/messages/inputs/\<inputName>" | uses the same link as C2D, inputName is encoded in an annotation named "x-opt-input-name" |
|
||||
|
||||
Put another way:
|
||||
* AMQP uses the C2D and telemetry links for inputs and outputs. This applies to both the device SDK and the service SDK. inputName and outputName are stored in annotations.
|
||||
* For outputs, MQTT uses the same base topic string as for telemetry, with the outputName being stored in the newly added $.on querystring value.
|
||||
* For inputs, MQTT has an entirely new topic string, ending in /inputs/\<inputName>
|
||||
|
||||
## Design principles
|
||||
1. The concept of "module" is completely orthogonal to the concepts of `inputName` and `outputName`. The `moduleId` is one of the properties of the service connection. The `inputName` and `outputName` define how a message travels. This orthogonality is very important.
|
||||
|
|
|
@ -51,7 +51,7 @@ client.attach(function (err) {
|
|||
|
||||
**SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_014: [** The `AmqpDeviceMethodClient` object shall set 2 properties of any AMQP link that it create:
|
||||
- `com.microsoft:api-version` shall be set to the current API version in use.
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the identifier of the device (also often referred to as `deviceId`). **]**
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the string "methods:" followed by a guid **]**
|
||||
|
||||
**SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_019: [** The `attach` method shall create a SenderLink and a ReceiverLink and attach them. **]**
|
||||
|
||||
|
@ -63,6 +63,8 @@ client.attach(function (err) {
|
|||
|
||||
**SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_017: [** The endpoint used to for the sender and receiver link shall be `/devices/<device-id>/methods/devicebound` **]**
|
||||
|
||||
**SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_18_001: [** If a `moduleId` value was set in the device's connection string, The endpoint used to for the sender and receiver link shall be `/devices/<deviceId>/modules/<moduleId>/methods/devicebound` **]**
|
||||
|
||||
### detach(callback)
|
||||
|
||||
**SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_022: [** The `detach` method shall detach both Sender and Receiver links. **]**
|
||||
|
|
|
@ -121,6 +121,8 @@ class AmqpTwinClient extends EventEmitter {
|
|||
|
||||
**SRS_NODE_DEVICE_AMQP_TWIN_06_007: [** The endpoint argument for `attachReceiverLink` shall be `/devices/<deviceId>/twin`. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_TWIN_18_001: [** If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachReceiverLink` shall be `/devices/<deviceId>/modules/<moduleId>/twin` **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_TWIN_06_008: [** The link options argument for `attachReceiverLink` shall be:
|
||||
attach: {
|
||||
properties: {
|
||||
|
@ -134,6 +136,8 @@ class AmqpTwinClient extends EventEmitter {
|
|||
|
||||
**SRS_NODE_DEVICE_AMQP_TWIN_06_009: [** The endpoint argument for `attachSenderLink` shall be `/device/<deviceId>/twin`. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_TWIN_18_002: [** If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachSenderLink` shall be `/device/<deviceId>/modules/<moduleId>/twin`. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_TWIN_06_010: [** The link options argument for `attachSenderLink` shall be:
|
||||
attach: {
|
||||
properties: {
|
||||
|
|
|
@ -109,6 +109,25 @@ The `sendEvent` method sends an event to the IoT Hub as the device indicated in
|
|||
|
||||
**SRS_NODE_DEVICE_AMQP_16_052: [** The `sendEventBatch` method shall throw a `NotImplementedError`. **]**
|
||||
|
||||
### sendOutputEvent(outputName: string, message: Message, done: (err?: Error, result?: results.MessageEnqueued) => void): void;
|
||||
The `sendOutputEvent` method sends an event to the IoT Hub as the device indicated in the constructor argument.
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_005: [** The `sendOutputEvent` method shall connect and authenticate the transport if necessary. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_006: [** The `sendOutputEvent` method shall create and attach the d2c link if necessary. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_007: [** The `sendOutputEvent` method shall construct an AMQP request using the message passed in argument as the body of the message. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_012: [** The `sendOutputEvent` method shall set the annotation "x-opt-output-name" on the message to the `outputName`. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_008: [** The `sendOutputEvent` method shall call the `done` callback with a null error object and a MessageEnqueued result object when the message has been successfully sent. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_009: [** If `sendOutputEvent` encounters an error before it can send the request, it shall invoke the `done` callback function and pass the standard JavaScript Error object with a text description of the error (err.message). **]**
|
||||
|
||||
### sendOutputEventBatch(outputName: string, messages: Message[], done: (err?: Error, result?: results.MessageEnqueued) => void): void;
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_004: [** `sendOutputEventBatch` shall throw a `NotImplementedError`. **]**
|
||||
|
||||
### getReceiver(done) [deprecated]
|
||||
This method is deprecated. The `AmqpReceiver` object and pattern is going away and the `Amqp` object now implements the `Receiver` interface until we can completely get rid of it in the device client.
|
||||
|
||||
|
@ -174,6 +193,13 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a
|
|||
|
||||
**SRS_NODE_DEVICE_AMQP_16_037: [** The `disableC2D` method shall call its `callback` immediately if the transport is already disconnected. **]**
|
||||
|
||||
### enableInputMessages(callback: (err?: Error) => void): void;
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_010: [** The `enableInputMessages` method shall enable C2D messages **]**
|
||||
|
||||
### disableInputMessages(callback: (err?: Error) => void): void;
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_011: [** The `disableInputMessages` method shall disable C2D messages **]**
|
||||
|
||||
### enableMethods(callback)
|
||||
|
||||
|
@ -252,20 +278,13 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a
|
|||
**SRS_NODE_DEVICE_AMQP_16_081: [** if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_16_082: [** if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connected, the connection shall be disconnected and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error. **]**
|
||||
### enableInputMessages(callback: (err?: Error) => void): void;
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_001: [** `enableInputMessages` shall throw a `NotImplementedError`. **]**
|
||||
|
||||
### disableInputMessages(callback: (err?: Error) => void): void;
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_002: [** `disableInputMessages` shall throw a `NotImplementedError`. **]**
|
||||
|
||||
### sendOutputEvent(outputName: string, message: Message, done: (err?: Error, result?: results.MessageEnqueued) => void): void;
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_003: [** `sendOutputEvent` shall throw a `NotImplementedError`. **]**
|
||||
|
||||
### sendOutputEventBatch(outputName: string, messages: Message[], done: (err?: Error, result?: results.MessageEnqueued) => void): void;
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_004: [** `sendOutputEventBatch` shall throw a `NotImplementedError`. **]**
|
||||
|
||||
|
||||
### message events
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_013: [** If `amqp` receives a message on the C2D link without an annotation named "x-opt-input-name", it shall emit a "message" event with the message as the event parameter. **]**
|
||||
|
||||
### inputMessage events
|
||||
|
||||
**SRS_NODE_DEVICE_AMQP_18_014: [** If `amqp` receives a message on the C2D link with an annotation named "x-opt-input-name", it shall emit a "message" event with the "x-opt-input-name" annotation as the first parameter and the message as the second parameter. **]**
|
||||
|
||||
|
|
|
@ -123,8 +123,18 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
this.emit('error', c2dError);
|
||||
};
|
||||
|
||||
this._c2dMessageListener = (msg) => {
|
||||
this.emit('message', AmqpMessage.toMessage(msg));
|
||||
this._c2dMessageListener = (msg: AmqpMessage) => {
|
||||
let inputName: string;
|
||||
if (msg.messageAnnotations) {
|
||||
inputName = msg.messageAnnotations['x-opt-input-name'];
|
||||
}
|
||||
if (inputName) {
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_014: [If `amqp` receives a message on the C2D link with an annotation named "x-opt-input-name", it shall emit a "message" event with the "x-opt-input-name" annotation as the first parameter and the message as the second parameter.]*/
|
||||
this.emit('inputMessage', inputName, AmqpMessage.toMessage(msg));
|
||||
} else {
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_013: [If `amqp` receives a message on the C2D link without an annotation named "x-opt-input-name", it shall emit a "message" event with the message as the event parameter.]*/
|
||||
this.emit('message', AmqpMessage.toMessage(msg));
|
||||
}
|
||||
};
|
||||
|
||||
this._d2cErrorListener = (err) => {
|
||||
|
@ -154,13 +164,14 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
disconnectCallback(null, new results.Disconnected());
|
||||
}
|
||||
},
|
||||
sendEvent: (message, sendCallback) => {
|
||||
sendEvent: (amqpMessage, sendCallback) => {
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_16_024: [The `sendEvent` method shall connect and authenticate the transport if necessary.]*/
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_005: [The `sendOutputEvent` method shall connect and authenticate the transport if necessary.]*/
|
||||
this._fsm.handle('connect', (err, result) => {
|
||||
if (err) {
|
||||
sendCallback(err);
|
||||
} else {
|
||||
this._fsm.handle('sendEvent', message, sendCallback);
|
||||
this._fsm.handle('sendEvent', amqpMessage, sendCallback);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
@ -251,8 +262,8 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
/*Codes_SRS_NODE_DEVICE_AMQP_16_055: [The `connect` method shall call its callback with an error if the callback passed to the `getDeviceCredentials` method is called with an error.]*/
|
||||
this._fsm.transition('disconnected', translateError('AMQP Transport: Could not get credentials', err), connectCallback);
|
||||
} else {
|
||||
this._c2dEndpoint = endpoint.messagePath(encodeURIComponent(credentials.deviceId));
|
||||
this._d2cEndpoint = endpoint.eventPath(credentials.deviceId);
|
||||
this._c2dEndpoint = endpoint.messagePath(credentials.deviceId, credentials.moduleId);
|
||||
this._d2cEndpoint = endpoint.eventPath(credentials.deviceId, credentials.moduleId);
|
||||
|
||||
const uri = this._getConnectionUri(credentials.host);
|
||||
this._amqp.connect(uri, credentials.x509, (err, connectResult) => {
|
||||
|
@ -314,11 +325,11 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
},
|
||||
connect: (connectCallback) => connectCallback(null, new results.Connected()),
|
||||
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
|
||||
sendEvent: (message, sendCallback) => {
|
||||
let amqpMessage = AmqpMessage.fromMessage(message);
|
||||
sendEvent: (amqpMessage, sendCallback) => {
|
||||
amqpMessage.properties.to = this._d2cEndpoint;
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_16_025: [The `sendEvent` method shall create and attach the d2c link if necessary.]*/
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_006: [The `sendOutputEvent` method shall create and attach the d2c link if necessary.]*/
|
||||
if (!this._d2cLink) {
|
||||
this._amqp.attachSenderLink(this._d2cEndpoint, null, (err, link) => {
|
||||
if (err) {
|
||||
|
@ -541,7 +552,8 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
/* Codes_SRS_NODE_DEVICE_AMQP_16_003: [The sendEvent method shall call the done() callback with a null error object and a MessageEnqueued result object when the message has been successfully sent.] */
|
||||
/* Codes_SRS_NODE_DEVICE_AMQP_16_004: [If sendEvent encounters an error before it can send the request, it shall invoke the done callback function and pass the standard JavaScript Error object with a text description of the error (err.message). ] */
|
||||
sendEvent(message: Message, done: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
this._fsm.handle('sendEvent', message, done);
|
||||
let amqpMessage = AmqpMessage.fromMessage(message);
|
||||
this._fsm.handle('sendEvent', amqpMessage, done);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -753,24 +765,32 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
* @private
|
||||
*/
|
||||
enableInputMessages(callback: (err?: Error) => void): void {
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_001: [`enableInputMessages` shall throw a `NotImplementedError`.]*/
|
||||
throw new errors.NotImplementedError('Input messages are not implemented over AMQP.');
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_010: [The `enableInputMessages` method shall enable C2D messages]*/
|
||||
this._fsm.handle('enableC2D', callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
disableInputMessages(callback: (err?: Error) => void): void {
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_002: [`disableInputMessages` shall throw a `NotImplementedError`.]*/
|
||||
throw new errors.NotImplementedError('Input messages are not implemented over AMQP.');
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_011: [The `disableInputMessages` method shall disable C2D messages]*/
|
||||
this._fsm.handle('disableC2D', callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
sendOutputEvent(outputName: string, message: Message, done: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_003: [`sendOutputEvent` shall throw a `NotImplementedError`.]*/
|
||||
throw new errors.NotImplementedError('Output events are not implemented over AMQP.');
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_007: [The `sendOutputEvent` method shall construct an AMQP request using the message passed in argument as the body of the message.]*/
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_008: [The `sendOutputEvent` method shall call the `done` callback with a null error object and a MessageEnqueued result object when the message has been successfully sent.]*/
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_009: [If `sendOutputEvent` encounters an error before it can send the request, it shall invoke the `done` callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/
|
||||
sendOutputEvent(outputName: string, message: Message, callback: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
let amqpMessage = AmqpMessage.fromMessage(message);
|
||||
if (!amqpMessage.messageAnnotations) {
|
||||
amqpMessage.messageAnnotations = {};
|
||||
}
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_18_012: [The `sendOutputEvent` method shall set the annotation "x-opt-output-name" on the message to the `outputName`.]*/
|
||||
amqpMessage.messageAnnotations['x-opt-output-name'] = outputName;
|
||||
this._fsm.handle('sendEvent', amqpMessage, callback);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
'use strict';
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import * as uuid from 'uuid';
|
||||
import * as machina from 'machina';
|
||||
import * as async from 'async';
|
||||
import * as dbg from 'debug';
|
||||
|
@ -78,16 +79,17 @@ export class AmqpDeviceMethodClient extends EventEmitter {
|
|||
this._fsm.transition('detached', attachCallback, err);
|
||||
} else {
|
||||
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_017: [The endpoint used to for the sender and receiver link shall be `/devices/<device-id>/methods/devicebound`.]*/
|
||||
this._methodEndpoint = endpoint.devicePath(encodeURIComponent(credentials.deviceId)) + '/methods/devicebound';
|
||||
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_18_001: [If a `moduleId` value was set in the device's connection string, The endpoint used to for the sender and receiver link shall be `/devices/<deviceId>/modules/<moduleId>/methods/devicebound`.]*/
|
||||
this._methodEndpoint = endpoint.devicePath(credentials.deviceId, credentials.moduleId) + '/methods/devicebound';
|
||||
|
||||
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_014: [** The `AmqpDeviceMethodClient` object shall set 2 properties of any AMQP link that it create:
|
||||
- `com.microsoft:api-version` shall be set to the current API version in use.
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the identifier of the device (also often referred to as `deviceId`).]*/
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the string "methods:" followed by a guid.]*/
|
||||
const linkOptions = {
|
||||
attach: {
|
||||
properties: {
|
||||
'com.microsoft:api-version': endpoint.apiVersion,
|
||||
'com.microsoft:channel-correlation-id': credentials.deviceId
|
||||
'com.microsoft:channel-correlation-id': 'methods:' + uuid.v4()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -140,8 +140,10 @@ export class AmqpTwinClient extends EventEmitter {
|
|||
this._fsm.transition('detached', err, attachCallback);
|
||||
} else {
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_007: [The endpoint argument for attachReceiverLink shall be `/device/<deviceId>/twin`.] */
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_18_001: [If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachReceiverLink` shall be `/devices/<deviceId>/modules/<moduleId>/twin`]*/
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_009: [The endpoint argument for attachSenderLink shall be `/device/<deviceId>/twin`.] */
|
||||
this._endpoint = endpoint.devicePath(credentials.deviceId) + '/twin';
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_18_002: [If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachSenderLink` shall be `/device/<deviceId>/modules/<moduleId>/twin`.]*/
|
||||
this._endpoint = endpoint.devicePath(credentials.deviceId, credentials.moduleId) + '/twin';
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_006: [When a listener is added for the `response` event, and the `post` event is NOT already subscribed, upstream and downstream links are established via calls to `attachReceiverLink` and `attachSenderLink`.] */
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_012: [When a listener is added for the `post` event, and the `response` event is NOT already subscribed, upstream and downstream links are established via calls to `attachReceiverLink` and `attachSenderLine`.] */
|
||||
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_036: [The same correlationId shall be used for both the sender and receiver links.]*/
|
||||
|
|
|
@ -66,19 +66,19 @@ describe('AmqpDeviceMethodClient', function () {
|
|||
var fakeAmqpClient = {
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_014: [The `AmqpDeviceMethodClient` object shall set 2 properties of any AMQP link that it create:
|
||||
- `com.microsoft:api-version` shall be set to the current API version in use.
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the identifier of the device (also often referred to as `deviceId`).]*/
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the string "methods:" followed by a guid.]*/
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_017: [The endpoint used to for the sender and receiver link shall be `/devices/<device-id>/methods/devicebound`.]*/
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_012: [The `AmqpDeviceMethodClient` object shall automatically establish the AMQP links required to receive method calls and send responses when either `onDeviceMethod` or `sendMethodResponse` is called.]*/
|
||||
attachSenderLink: function(ep, options, callback) {
|
||||
assert.strictEqual(ep, '/devices/' + fakeConfig.deviceId + '/methods/devicebound');
|
||||
assert.strictEqual(options.attach.properties['com.microsoft:api-version'], endpoint.apiVersion);
|
||||
assert.strictEqual(options.attach.properties['com.microsoft:channel-correlation-id'], fakeConfig.deviceId);
|
||||
assert(options.attach.properties['com.microsoft:channel-correlation-id'].startsWith('methods:'));
|
||||
callback(null, {});
|
||||
},
|
||||
attachReceiverLink: function(ep, options, callback) {
|
||||
assert.strictEqual(ep, '/devices/' + fakeConfig.deviceId + '/methods/devicebound');
|
||||
assert.strictEqual(options.attach.properties['com.microsoft:api-version'], endpoint.apiVersion);
|
||||
assert.strictEqual(options.attach.properties['com.microsoft:channel-correlation-id'], fakeConfig.deviceId);
|
||||
assert(options.attach.properties['com.microsoft:channel-correlation-id'].startsWith('methods:'));
|
||||
callback(null, fakeAmqpReceiver);
|
||||
}
|
||||
};
|
||||
|
@ -100,6 +100,56 @@ describe('AmqpDeviceMethodClient', function () {
|
|||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_017: [The endpoint used to for the sender and receiver link shall be `/devices/<device-id>/methods/devicebound`.]*/
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_18_001: [If a `moduleId` value was set in the device's connection string, The endpoint used to for the sender and receiver link shall be `/devices/<deviceId>/modules/<moduleId>/methods/devicebound`.]*/
|
||||
[{
|
||||
deviceId: 'fakeDeviceId',
|
||||
moduleId: undefined,
|
||||
expectedEndpoint: '/devices/fakeDeviceId/methods/devicebound'
|
||||
},
|
||||
{
|
||||
deviceId: 'fakeDeviceId',
|
||||
moduleId: 'fakeModuleId',
|
||||
expectedEndpoint: '/devices/fakeDeviceId/modules/fakeModuleId/methods/devicebound'
|
||||
}].forEach(function(testConfig) {
|
||||
it('uses the right link parameters when moduleId is ' + testConfig.moduleId, function(testCallback) {
|
||||
var authProvider = {
|
||||
getDeviceCredentials: sinon.stub().callsArgWith(0, null, testConfig)
|
||||
};
|
||||
|
||||
var fakeAmqpReceiver = new EventEmitter();
|
||||
var fakeMethodName = 'testMethod';
|
||||
var fakeMethodRequest = new AmqpMessage();
|
||||
fakeMethodRequest.body = 'payload';
|
||||
fakeMethodRequest.properties = {
|
||||
correlationId: 'fakeCorrelationId'
|
||||
};
|
||||
fakeMethodRequest.applicationProperties = {
|
||||
'IoThub-methodname': fakeMethodName
|
||||
};
|
||||
|
||||
var fakeAmqpClient = {
|
||||
attachSenderLink: sinon.spy(function(ep, options, callback) {
|
||||
assert.strictEqual(ep, testConfig.expectedEndpoint);
|
||||
callback(null, {});
|
||||
}),
|
||||
attachReceiverLink: sinon.spy(function(ep, options, callback) {
|
||||
assert.strictEqual(ep, testConfig.expectedEndpoint);
|
||||
callback(null, fakeAmqpReceiver);
|
||||
})
|
||||
};
|
||||
|
||||
var client = new AmqpDeviceMethodClient(authProvider, fakeAmqpClient);
|
||||
client.attach(function () {
|
||||
client.onDeviceMethod(fakeMethodName, function() {});
|
||||
assert(fakeAmqpClient.attachSenderLink.calledOnce);
|
||||
assert(fakeAmqpClient.attachReceiverLink.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
it('saves the callback for the method even though it is detached and works when attached', function (testCallback) {
|
||||
var fakeAmqpReceiver = new EventEmitter();
|
||||
var fakeMethodName = 'testMethod';
|
||||
|
@ -215,19 +265,19 @@ describe('AmqpDeviceMethodClient', function () {
|
|||
var fakeAmqpClient = {
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_014: [The `AmqpDeviceMethodClient` object shall set 2 properties of any AMQP link that it create:
|
||||
- `com.microsoft:api-version` shall be set to the current API version in use.
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the identifier of the device (also often referred to as `deviceId`).]*/
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the string "methods:" followed by a guid.]*/
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_017: [The endpoint used to for the sender and receiver link shall be `/devices/<device-id>/methods/devicebound`.]*/
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_012: [The `AmqpDeviceMethodClient` object shall automatically establish the AMQP links required to receive method calls and send responses when either `onDeviceMethod` or `sendMethodResponse` is called.]*/
|
||||
attachSenderLink: function(ep, options, callback) {
|
||||
assert.strictEqual(ep, '/devices/' + fakeConfig.deviceId + '/methods/devicebound');
|
||||
assert.strictEqual(options.attach.properties['com.microsoft:api-version'], endpoint.apiVersion);
|
||||
assert.strictEqual(options.attach.properties['com.microsoft:channel-correlation-id'], fakeConfig.deviceId);
|
||||
assert(options.attach.properties['com.microsoft:channel-correlation-id'].startsWith('methods:'));
|
||||
callback(null, {});
|
||||
},
|
||||
attachReceiverLink: function(ep, options, callback) {
|
||||
assert.strictEqual(ep, '/devices/' + fakeConfig.deviceId + '/methods/devicebound');
|
||||
assert.strictEqual(options.attach.properties['com.microsoft:api-version'], endpoint.apiVersion);
|
||||
assert.strictEqual(options.attach.properties['com.microsoft:channel-correlation-id'], fakeConfig.deviceId);
|
||||
assert(options.attach.properties['com.microsoft:channel-correlation-id'].startsWith('methods:'));
|
||||
callback(null, new EventEmitter());
|
||||
},
|
||||
send: function(message, endpoint, to, sendCallback) {
|
||||
|
@ -265,10 +315,6 @@ describe('AmqpDeviceMethodClient', function () {
|
|||
payload: 'fakePayload'
|
||||
};
|
||||
var fakeAmqpClient = {
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_014: [The `AmqpDeviceMethodClient` object shall set 2 properties of any AMQP link that it create:
|
||||
- `com.microsoft:api-version` shall be set to the current API version in use.
|
||||
- `com.microsoft:channel-correlation-id` shall be set to the identifier of the device (also often referred to as `deviceId`).]*/
|
||||
/*Tests_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_017: [The endpoint used to for the sender and receiver link shall be `/devices/<device-id>/methods/devicebound`.]*/
|
||||
attachSenderLink: sinon.stub(),
|
||||
attachReceiverLink: sinon.stub(),
|
||||
};
|
||||
|
@ -596,6 +642,6 @@ describe('AmqpDeviceMethodClient', function () {
|
|||
testCallback();
|
||||
});
|
||||
});
|
||||
})
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -11,10 +11,8 @@ var sinon = require('sinon');
|
|||
var AmqpMessage = require('azure-iot-amqp-base').AmqpMessage;
|
||||
var Message = require('azure-iot-common').Message;
|
||||
var Amqp = require('../lib/amqp.js').Amqp;
|
||||
var AmqpTwinClient = require('../lib/amqp_twin_client.js').AmqpTwinClient;
|
||||
var errors = require('azure-iot-common').errors;
|
||||
var results = require('azure-iot-common').results;
|
||||
var endpoint = require('azure-iot-common').endpoint;
|
||||
var AuthenticationType = require('azure-iot-common').AuthenticationType;
|
||||
|
||||
describe('Amqp', function () {
|
||||
|
@ -28,7 +26,6 @@ describe('Amqp', function () {
|
|||
|
||||
var testMessage = new Message();
|
||||
testMessage.transportObj = {};
|
||||
var testCallback = function () { };
|
||||
var configWithSSLOptions = { host: 'hub.host.name', deviceId: 'deviceId', x509: 'some SSL options' };
|
||||
var simpleSas = 'SharedAccessSignature sr=foo&sig=123&se=123';
|
||||
var configWithSAS = { host: 'hub.host.name', deviceId: 'deviceId', sharedAccessSignature: simpleSas};
|
||||
|
@ -97,6 +94,7 @@ describe('Amqp', function () {
|
|||
it('does not subscribe to the newTokenAvailable event if the authenticationProvider is x509', function () {
|
||||
fakeX509AuthenticationProvider.on = sinon.stub();
|
||||
var amqp = new Amqp(fakeX509AuthenticationProvider, fakeBaseClient);
|
||||
void(amqp);
|
||||
assert.isTrue(fakeX509AuthenticationProvider.on.notCalled);
|
||||
});
|
||||
|
||||
|
@ -104,7 +102,7 @@ describe('Amqp', function () {
|
|||
it('initiates a putToken when a newTokenAvailable event is received', function (testCallback) {
|
||||
var newSas = 'SharedAccessSignature sr=new&sig=456&se=456';
|
||||
transport.connect(function () {
|
||||
assert.isTrue(fakeBaseClient.putToken.calledOnce)
|
||||
assert.isTrue(fakeBaseClient.putToken.calledOnce);
|
||||
fakeTokenAuthenticationProvider.emit('newTokenAvailable', { sharedAccessSignature: newSas });
|
||||
assert.isTrue(fakeBaseClient.putToken.calledTwice);
|
||||
assert.strictEqual(fakeBaseClient.putToken.secondCall.args[1], newSas);
|
||||
|
@ -125,7 +123,7 @@ describe('Amqp', function () {
|
|||
fakeBaseClient.putToken = sinon.stub().callsArgWith(2, fakeError);
|
||||
fakeTokenAuthenticationProvider.emit('newTokenAvailable', { sharedAccessSignature: newSas });
|
||||
});
|
||||
})
|
||||
});
|
||||
});
|
||||
|
||||
describe('Direct Methods', function () {
|
||||
|
@ -195,7 +193,7 @@ describe('Amqp', function () {
|
|||
requestId: 'foo',
|
||||
payload: { key: 'value' },
|
||||
methodName: 'fakeMethod'
|
||||
}
|
||||
};
|
||||
|
||||
transport.connect(function () {
|
||||
transport.onDeviceMethod(fakeMethodRequest.methodName, function () {
|
||||
|
@ -312,7 +310,7 @@ describe('Amqp', function () {
|
|||
});
|
||||
});
|
||||
|
||||
describe('disableMethods', function (testCallback) {
|
||||
describe('disableMethods', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_044: [The `disableMethods` method shall call its `callback` immediately if the transport is already disconnected.]*/
|
||||
it('calls the callback immediately if the transport is disconnected', function (testCallback) {
|
||||
transport.disableMethods(function (err) {
|
||||
|
@ -460,13 +458,12 @@ describe('Amqp', function () {
|
|||
});
|
||||
|
||||
it('defers the call if already connected and authenticating', function (testCallback) {
|
||||
var connectErr = new Error('cannot connect');
|
||||
var authCallback;
|
||||
fakeBaseClient.initializeCBS = sinon.stub().callsFake(function (done) {
|
||||
authCallback = done;
|
||||
});
|
||||
|
||||
transport.connect(function (err) {
|
||||
transport.connect(function () {
|
||||
fakeBaseClient.initializeCBS = sinon.stub().callsArgWith(0, null);
|
||||
});
|
||||
|
||||
|
@ -548,7 +545,6 @@ describe('Amqp', function () {
|
|||
});
|
||||
|
||||
it('is deferred until connecting succeeds', function (testCallback) {
|
||||
var connectErr = new Error('cannot connect');
|
||||
var connectCallback;
|
||||
fakeBaseClient.connect = sinon.stub().callsFake(function (uri, options, done) {
|
||||
connectCallback = done;
|
||||
|
@ -579,7 +575,7 @@ describe('Amqp', function () {
|
|||
transport.connect(function () {
|
||||
transport.on('message', function () {});
|
||||
transport.enableC2D(function () {
|
||||
transport.disconnect(function (err, result) {
|
||||
transport.disconnect(function () {
|
||||
assert.isTrue(receiver.removeListener.calledWith('message'));
|
||||
assert.isTrue(receiver.removeListener.calledWith('error'));
|
||||
assert.isTrue(receiver.detach.calledOnce);
|
||||
|
@ -593,7 +589,7 @@ describe('Amqp', function () {
|
|||
it('detaches the D2C link if it is attached', function (testCallback) {
|
||||
transport.connect(function () {
|
||||
transport.sendEvent(new Message('foo'), function () {
|
||||
transport.disconnect(function (err, result) {
|
||||
transport.disconnect(function () {
|
||||
assert.isTrue(sender.removeListener.calledWith('error'));
|
||||
assert.isTrue(sender.detach.calledOnce);
|
||||
testCallback();
|
||||
|
@ -662,7 +658,7 @@ describe('Amqp', function () {
|
|||
assert.strictEqual(err.amqpError, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
})
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -892,21 +888,34 @@ describe('Amqp', function () {
|
|||
});
|
||||
});
|
||||
|
||||
describe('D2C', function () {
|
||||
describe('#sendEvent', function () {
|
||||
|
||||
|
||||
[{
|
||||
functionUnderTest: 'sendEvent',
|
||||
invokeFunction: function(msg, callback) { transport.sendEvent(msg, callback); },
|
||||
expectedOutputName: null
|
||||
},
|
||||
{
|
||||
functionUnderTest: 'sendOutputEvent',
|
||||
invokeFunction: function(msg, callback) { transport.sendOutputEvent('_fake_output', msg, callback); },
|
||||
expectedOutputName: '_fake_output'
|
||||
}].forEach(function(testConfig) {
|
||||
describe('#' + testConfig.functionUnderTest, function () {
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_024: [The `sendEvent` method shall connect and authenticate the transport if necessary.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_005: [The `sendOutputEvent` method shall connect and authenticate the transport if necessary.]*/
|
||||
it('automatically connects the transport if necessary', function (testCallback) {
|
||||
transport.sendEvent(new Message('test'), function () {
|
||||
testConfig.invokeFunction(new Message('test'), function () {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_009: [If `sendOutputEvent` encounters an error before it can send the request, it shall invoke the `done` callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/
|
||||
it('forwards the error if connecting fails while trying to send a message', function (testCallback) {
|
||||
var fakeError = new Error('failed to connect');
|
||||
fakeBaseClient.connect = sinon.stub().callsArgWith(2, fakeError);
|
||||
|
||||
transport.sendEvent(new Message('test'), function (err) {
|
||||
testConfig.invokeFunction(new Message('test'), function (err) {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
assert.strictEqual(err.amqpError, fakeError);
|
||||
testCallback();
|
||||
|
@ -923,7 +932,7 @@ describe('Amqp', function () {
|
|||
|
||||
transport.connect(function () {});
|
||||
|
||||
transport.sendEvent(new Message('test'), function () {
|
||||
testConfig.invokeFunction(new Message('test'), function () {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
|
@ -941,7 +950,7 @@ describe('Amqp', function () {
|
|||
|
||||
transport.connect(function () {});
|
||||
|
||||
transport.sendEvent(new Message('test'), function () {
|
||||
testConfig.invokeFunction(new Message('test'), function () {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
|
@ -950,11 +959,12 @@ describe('Amqp', function () {
|
|||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_025: [The `sendEvent` method shall create and attach the d2c link if necessary.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_006: [The `sendOutputEvent` method shall create and attach the d2c link if necessary.]*/
|
||||
it('attaches the messaging link on first send, then reuses it', function (testCallback) {
|
||||
transport.sendEvent(new Message('test'), function () {
|
||||
testConfig.invokeFunction(new Message('test'), function () {
|
||||
assert(fakeBaseClient.attachSenderLink.calledOnce);
|
||||
assert(sender.on.calledOnce);
|
||||
transport.sendEvent(new Message('test2'), function () {
|
||||
testConfig.invokeFunction(new Message('test2'), function () {
|
||||
assert(fakeBaseClient.attachSenderLink.calledOnce);
|
||||
assert(sender.send.calledTwice);
|
||||
testCallback();
|
||||
|
@ -972,7 +982,7 @@ describe('Amqp', function () {
|
|||
testCallback();
|
||||
});
|
||||
|
||||
transport.sendEvent(new Message('test'), function (err) {
|
||||
testConfig.invokeFunction(new Message('test'), function () {
|
||||
sender.emit('error', fakeError);
|
||||
});
|
||||
});
|
||||
|
@ -986,7 +996,7 @@ describe('Amqp', function () {
|
|||
transport.connect(function () {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
transport.disconnect(function () {});
|
||||
transport.sendEvent(new Message('test'), function () {
|
||||
testConfig.invokeFunction(new Message('test'), function () {
|
||||
assert(fakeBaseClient.connect.calledTwice);
|
||||
testCallback();
|
||||
});
|
||||
|
@ -998,7 +1008,7 @@ describe('Amqp', function () {
|
|||
it('calls the callback with an error if attaching the link fails', function (testCallback) {
|
||||
var fakeError = new Error('fake');
|
||||
fakeBaseClient.attachSenderLink = sinon.stub().callsArgWith(2, fakeError);
|
||||
transport.sendEvent(new Message('test'), function (err) {
|
||||
testConfig.invokeFunction(new Message('test'), function (err) {
|
||||
assert.strictEqual(err.amqpError, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
|
@ -1007,14 +1017,14 @@ describe('Amqp', function () {
|
|||
it('calls the callback with an error if sending the message fails', function (testCallback) {
|
||||
var fakeError = new Error('fake');
|
||||
sender.send = sinon.stub().callsArgWith(1, fakeError);
|
||||
transport.sendEvent(new Message('test'), function (err) {
|
||||
testConfig.invokeFunction(new Message('test'), function (err) {
|
||||
assert.strictEqual(err.amqpError, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
it('registers an error event handler on the d2c link', function (testCallback) {
|
||||
transport.sendEvent(new Message('test'), function (err) {
|
||||
testConfig.invokeFunction(new Message('test'), function () {
|
||||
assert.isTrue(sender.on.calledWith('error'));
|
||||
assert.doesNotThrow(function () {
|
||||
sender.emit('error', new Error());
|
||||
|
@ -1022,6 +1032,25 @@ describe('Amqp', function () {
|
|||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_002: [The `sendEvent` method shall construct an AMQP request using the message passed in argument as the body of the message.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_003: [The `sendEvent` method shall call the `done` callback with a null error object and a MessageEnqueued result object when the message has been successfully sent.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_007: [The `sendOutputEvent` method shall construct an AMQP request using the message passed in argument as the body of the message.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_012: [The `sendOutputEvent` method shall set the annotation "x-opt-output-name" on the message to the `outputName`.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_008: [The `sendOutputEvent` method shall call the `done` callback with a null error object and a MessageEnqueued result object when the message has been successfully sent.]*/
|
||||
it ('constructs a request correctly and succeeds correctly', function (testCallback) {
|
||||
testConfig.invokeFunction(new Message('test'), function (err, result) {
|
||||
var sentMsg = sender.send.firstCall.args[0];
|
||||
assert.instanceOf(sentMsg, AmqpMessage);
|
||||
assert.instanceOf(result, results.MessageEnqueued);
|
||||
assert.strictEqual(sentMsg.body, 'test');
|
||||
if (testConfig.expectedOutputName) {
|
||||
assert.strictEqual(sentMsg.messageAnnotations['x-opt-output-name'], testConfig.expectedOutputName);
|
||||
}
|
||||
testCallback(err);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -1092,120 +1121,130 @@ describe('Amqp', function () {
|
|||
});
|
||||
});
|
||||
|
||||
describe('enableC2D', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_031: [The `enableC2D` method shall connect and authenticate the transport if it is disconnected.]*/
|
||||
it('connects the transport if it is disconnected', function (testCallback) {
|
||||
transport.enableC2D(function () {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/
|
||||
it('attaches the C2D link', function (testCallback) {
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport.enableC2D(function () {
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_010: [The `enableInputMessages` method shall enable C2D messages]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_011: [The `disableInputMessages` method shall disable C2D messages]*/
|
||||
[{
|
||||
enableFunc: 'enableC2D',
|
||||
disableFunc: 'disableC2D'
|
||||
},{
|
||||
enableFunc: 'enableInputMessages',
|
||||
disableFunc: 'disableInputMessages'
|
||||
}].forEach(function(testConfig) {
|
||||
describe(testConfig.enableFunc, function () {
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_031: [The `enableC2D` method shall connect and authenticate the transport if it is disconnected.]*/
|
||||
it('connects the transport if it is disconnected', function (testCallback) {
|
||||
transport[testConfig.enableFunc](function () {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
|
||||
it('calls its callback with an Error if connecting the transport fails', function (testCallback) {
|
||||
fakeBaseClient.connect = sinon.stub().callsArgWith(2, new Error('fake failed to connect'));
|
||||
transport.enableC2D(function (err) {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/
|
||||
it('attaches the C2D link', function (testCallback) {
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport[testConfig.enableFunc](function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('calls its callback with an Error if attaching the C2D link fails', function (testCallback) {
|
||||
fakeBaseClient.attachReceiverLink = sinon.stub().callsArgWith(2, new Error('fake failed to attach'));
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport.enableC2D(function (err) {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
|
||||
it('calls its callback with an Error if connecting the transport fails', function (testCallback) {
|
||||
fakeBaseClient.connect = sinon.stub().callsArgWith(2, new Error('fake failed to connect'));
|
||||
transport[testConfig.enableFunc](function (err) {
|
||||
assert(fakeBaseClient.connect.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_034: [Any `error` event received on the C2D link shall trigger the emission of an `error` event by the transport, with an argument that is a `C2DDetachedError` object with the `innerError` property set to that error.]*/
|
||||
// disabled until the client supports it
|
||||
it('emits a CloudToDeviceDetachedError with an innerError property if the link fails after being established correctly', function (testCallback) {
|
||||
var fakeError = new Error('fake C2D receiver link error');
|
||||
transport.on('error', function (err) {
|
||||
assert.instanceOf(err, errors.CloudToDeviceDetachedError);
|
||||
assert.strictEqual(err.innerError, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport.enableC2D(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
receiver.emit('error', fakeError);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('forwards messages to the client once connected and authenticated', function (testCallback) {
|
||||
var fakeMessage = new AmqpMessage();
|
||||
|
||||
transport.on('message', function (msg) {
|
||||
assert.instanceOf(msg, Message);
|
||||
assert.strictEqual(msg.transportObj, fakeMessage);
|
||||
testCallback();
|
||||
});
|
||||
|
||||
transport.connect(function () {
|
||||
transport.enableC2D(function () {
|
||||
receiver.emit('message', fakeMessage);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('disableC2D', function (testCallback) {
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_037: [The `disableC2D` method shall call its `callback` immediately if the transport is already disconnected.]*/
|
||||
it('calls the callback immediately if the transport is disconnected', function (testCallback) {
|
||||
transport.disableC2D(function (err) {
|
||||
assert.isNotOk(err);
|
||||
assert(receiver.detach.notCalled);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/
|
||||
it('detaches the C2D link', function (testCallback) {
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport.enableC2D(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
transport.disableC2D(function () {
|
||||
assert(receiver.detach.calledOnce);
|
||||
it('calls its callback with an Error if attaching the C2D link fails', function (testCallback) {
|
||||
fakeBaseClient.attachReceiverLink = sinon.stub().callsArgWith(2, new Error('fake failed to attach'));
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport[testConfig.enableFunc](function (err) {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_034: [Any `error` event received on the C2D link shall trigger the emission of an `error` event by the transport, with an argument that is a `C2DDetachedError` object with the `innerError` property set to that error.]*/
|
||||
// disabled until the client supports it
|
||||
it('emits a CloudToDeviceDetachedError with an innerError property if the link fails after being established correctly', function (testCallback) {
|
||||
var fakeError = new Error('fake C2D receiver link error');
|
||||
transport.on('error', function (err) {
|
||||
assert.instanceOf(err, errors.CloudToDeviceDetachedError);
|
||||
assert.strictEqual(err.innerError, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport[testConfig.enableFunc](function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
receiver.emit('error', fakeError);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('forwards messages to the client once connected and authenticated', function (testCallback) {
|
||||
var fakeMessage = new AmqpMessage();
|
||||
|
||||
transport.on('message', function (msg) {
|
||||
assert.instanceOf(msg, Message);
|
||||
assert.strictEqual(msg.transportObj, fakeMessage);
|
||||
testCallback();
|
||||
});
|
||||
|
||||
transport.connect(function () {
|
||||
transport[testConfig.enableFunc](function () {
|
||||
receiver.emit('message', fakeMessage);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/
|
||||
it('calls its callback with an Error if an error happens while detaching the C2D link', function (testCallback) {
|
||||
receiver.detach = sinon.stub().callsArgWith(0, new Error('fake detach error'));
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport.enableC2D(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
transport.disableC2D(function (err) {
|
||||
assert(receiver.detach.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
describe(testConfig.disableFunc, function () {
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_037: [The `disableC2D` method shall call its `callback` immediately if the transport is already disconnected.]*/
|
||||
it('calls the callback immediately if the transport is disconnected', function (testCallback) {
|
||||
transport[testConfig.disableFunc](function (err) {
|
||||
assert.isNotOk(err);
|
||||
assert(receiver.detach.notCalled);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/
|
||||
it('detaches the C2D link', function (testCallback) {
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport[testConfig.enableFunc](function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
transport[testConfig.disableFunc](function () {
|
||||
assert(receiver.detach.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/
|
||||
it('calls its callback with an Error if an error happens while detaching the C2D link', function (testCallback) {
|
||||
receiver.detach = sinon.stub().callsArgWith(0, new Error('fake detach error'));
|
||||
transport.connect(function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.notCalled);
|
||||
transport[testConfig.enableFunc](function () {
|
||||
assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint));
|
||||
transport[testConfig.disableFunc](function (err) {
|
||||
assert(receiver.detach.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -1423,16 +1462,48 @@ describe('Amqp', function () {
|
|||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_013: [If `amqp` receives a message on the C2D link without an annotation named "x-opt-input-name", it shall emit a "message" event with the message as the event parameter.]*/
|
||||
describe('on(\'message\')', function () {
|
||||
it('calls the message handler when message received', function (testCallback) {
|
||||
var testText = '__TEST_TEXT__';
|
||||
transport.connect(function () {
|
||||
transport.on('message', function (msg) {
|
||||
assert.strictEqual(msg.data, testText);
|
||||
testCallback();
|
||||
});
|
||||
transport.enableC2D(function (err) {
|
||||
assert(!err);
|
||||
receiver.emit('message', AmqpMessage.fromMessage(new Message(testText)));
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_014: [If `amqp` receives a message on the C2D link with an annotation named "x-opt-input-name", it shall emit a "message" event with the "x-opt-input-name" annotation as the first parameter and the message as the second parameter.]*/
|
||||
describe('on(\'inputMessage\')', function () {
|
||||
it('calls the message handler when message received', function (testCallback) {
|
||||
var testText = '__TEST_TEXT__';
|
||||
var testInputName = '__INPUT__';
|
||||
transport.connect(function () {
|
||||
transport.on('inputMessage', function (inputName, msg) {
|
||||
assert.strictEqual(inputName, testInputName);
|
||||
assert.strictEqual(msg.data, testText);
|
||||
testCallback();
|
||||
});
|
||||
transport.enableInputMessages(function (err) {
|
||||
assert(!err);
|
||||
var amqpMessage = AmqpMessage.fromMessage(new Message(testText));
|
||||
amqpMessage.messageAnnotations = { 'x-opt-input-name': testInputName };
|
||||
receiver.emit('message', amqpMessage);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_16_052: [The `sendEventBatch` method shall throw a `NotImplementedError`.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_001: [`enableInputMessages` shall throw a `NotImplementedError`.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_002: [`disableInputMessages` shall throw a `NotImplementedError`.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_003: [`sendOutputEvent` shall throw a `NotImplementedError`.]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_18_004: [`sendOutputEventBatch` shall throw a `NotImplementedError`.]*/
|
||||
[
|
||||
'sendEventBatch',
|
||||
'enableInputMessages',
|
||||
'disableInputMessages',
|
||||
'sendOutputEvent',
|
||||
'sendOutputEventBatch'
|
||||
].forEach(function (methodName) {
|
||||
describe('#' + methodName, function () {
|
||||
|
@ -1440,7 +1511,8 @@ describe('Amqp', function () {
|
|||
assert.throws(function () {
|
||||
transport[methodName]();
|
||||
}, errors.NotImplementedError);
|
||||
})
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -8,26 +8,24 @@ var EventEmitter = require('events').EventEmitter;
|
|||
var sinon = require('sinon');
|
||||
|
||||
var AmqpTwinClient = require('../lib/amqp_twin_client.js').AmqpTwinClient;
|
||||
var Amqp = require('../lib/amqp').Amqp;
|
||||
var AmqpMessage = require('azure-iot-amqp-base').AmqpMessage;
|
||||
var errors = require('azure-iot-common').errors;
|
||||
var endpoint = require('azure-iot-common').endpoint;
|
||||
|
||||
describe('AmqpTwinClient', function () {
|
||||
|
||||
var fakeConfig = {
|
||||
deviceId: 'deviceId'
|
||||
};
|
||||
|
||||
var fakeAuthenticationProvider, fakeAmqpClient, fakeSenderLink, fakeReceiverLink, twinClient;
|
||||
var fakeConfig, fakeAuthenticationProvider, fakeAmqpClient, fakeSenderLink, fakeReceiverLink, twinClient;
|
||||
|
||||
beforeEach(function () {
|
||||
fakeAuthenticationProvider = {
|
||||
getDeviceCredentials: sinon.stub().callsArgWith(0, null, fakeConfig)
|
||||
};
|
||||
|
||||
fakeConfig = {
|
||||
deviceId: 'deviceId'
|
||||
};
|
||||
|
||||
fakeSenderLink = new EventEmitter();
|
||||
fakeSenderLink.send = sinon.stub().callsArg(1)
|
||||
fakeSenderLink.send = sinon.stub().callsArg(1);
|
||||
|
||||
fakeReceiverLink = new EventEmitter();
|
||||
|
||||
|
@ -87,12 +85,25 @@ describe('AmqpTwinClient', function () {
|
|||
assert.isTrue(fakeAmqpClient.attachReceiverLink.calledOnce);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_TWIN_06v_007: [The endpoint argument for attachReceiverLink shall be `/devices/<deviceId>/twin`.] */
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_TWIN_06_007: [The endpoint argument for attachReceiverLink shall be `/devices/<deviceId>/twin`.] */
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_TWIN_06_009: [The endpoint argument for attachSenderLink shall be `/devices/<deviceId>/twin`.] */
|
||||
it('attaches sender and receiver links to the `/devices/<deviceId>/twin` endpoint.', function() {
|
||||
methodUnderTest();
|
||||
assert.isTrue(fakeAmqpClient.attachSenderLink.calledWith('/devices/' + fakeConfig.deviceId + '/twin'));
|
||||
assert.isTrue(fakeAmqpClient.attachReceiverLink.calledWith('/devices/' + fakeConfig.deviceId + '/twin'));
|
||||
assert.strictEqual(fakeAmqpClient.attachSenderLink.firstCall.args[0],'/devices/' + fakeConfig.deviceId + '/twin');
|
||||
assert.strictEqual(fakeAmqpClient.attachReceiverLink.firstCall.args[0],'/devices/' + fakeConfig.deviceId + '/twin');
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_TWIN_18_001: [If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachReceiverLink` shall be `/devices/<deviceId>/modules/<moduleId>/twin`]*/
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_TWIN_18_002: [If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachSenderLink` shall be `/device/<deviceId>/modules/<moduleId>/twin`.]*/
|
||||
it('attaches sender and receiver links to the `/devices/<deviceId>/modules/<moduleId>/twin` endpoint when there\'s a moduleId', function() {
|
||||
fakeConfig.moduleId = "fakeModuleId";
|
||||
fakeAuthenticationProvider = {
|
||||
getDeviceCredentials: sinon.stub().callsArgWith(0, null, fakeConfig)
|
||||
};
|
||||
twinClient = new AmqpTwinClient(fakeAuthenticationProvider, fakeAmqpClient);
|
||||
methodUnderTest();
|
||||
assert.strictEqual(fakeAmqpClient.attachSenderLink.firstCall.args[0],'/devices/' + fakeConfig.deviceId + '/modules/' + fakeConfig.moduleId + '/twin');
|
||||
assert.strictEqual(fakeAmqpClient.attachReceiverLink.firstCall.args[0],'/devices/' + fakeConfig.deviceId + '/modules/' + fakeConfig.moduleId + '/twin');
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_AMQP_TWIN_06_010: [** The link options argument for attachSenderLink shall be:
|
||||
|
@ -577,10 +588,10 @@ describe('AmqpTwinClient', function () {
|
|||
});
|
||||
|
||||
it('ignores a message that has no correlationId and no data', function () {
|
||||
twinClient.on('twinDesiredPropertiesUpdate', function (delta) {
|
||||
twinClient.on('twinDesiredPropertiesUpdate', function () {
|
||||
assert.fail();
|
||||
});
|
||||
twinClient.on('error', function (delta) {
|
||||
twinClient.on('error', function () {
|
||||
assert.fail();
|
||||
});
|
||||
|
||||
|
@ -596,7 +607,7 @@ describe('AmqpTwinClient', function () {
|
|||
},
|
||||
data: undefined
|
||||
});
|
||||
})
|
||||
});
|
||||
});
|
||||
|
||||
describe('error', function () {
|
||||
|
@ -607,7 +618,7 @@ describe('AmqpTwinClient', function () {
|
|||
assert.isTrue(fakeAmqpClient.detachSenderLink.calledOnce);
|
||||
assert.isTrue(fakeAmqpClient.detachReceiverLink.calledOnce);
|
||||
testCallback();
|
||||
})
|
||||
});
|
||||
twinClient.enableTwinDesiredPropertiesUpdates(function () {
|
||||
fakeSenderLink.emit('error', fakeError);
|
||||
});
|
||||
|
@ -629,7 +640,7 @@ describe('AmqpTwinClient', function () {
|
|||
assert.isTrue(fakeAmqpClient.detachSenderLink.calledOnce);
|
||||
assert.isTrue(fakeAmqpClient.detachReceiverLink.calledOnce);
|
||||
testCallback();
|
||||
})
|
||||
});
|
||||
twinClient.enableTwinDesiredPropertiesUpdates(function () {
|
||||
fakeReceiverLink.emit('error', fakeError);
|
||||
});
|
||||
|
|
|
@ -49,8 +49,9 @@
|
|||
"job_client": "mocha --reporter spec test/job_client.js",
|
||||
"authentication": "mocha --reporter spec test/authentication.js",
|
||||
"provisioning": "cd ../provisioning/e2e && npm run ci && cd ../../e2etests",
|
||||
"configurations": "mocha --reporter spec test/configurations.js",
|
||||
"modules": "mocha --reporter spec test/modules.js",
|
||||
"phase1_fast": "npm-run-all -p -l twin_disconnect service registry device_acknowledge_tests upload_disconnect authentication job_client",
|
||||
"phase1_fast": "npm-run-all -p -l twin_disconnect service registry device_acknowledge_tests upload_disconnect authentication job_client module_crud module_messaging module_twin module_methods",
|
||||
"phase2_slow": "npm-run-all -p -l device_method twin_e2e_tests sas_token_tests device_service provisioning",
|
||||
"alltest": "npm run phase1_fast && npm run phase2_slow",
|
||||
"ci": "npm -s run lint && npm -s run alltest",
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
|
||||
var eventHubClient = require('azure-event-hubs').Client;
|
||||
var util = require('util');
|
||||
var EventEmitter = require('events');
|
||||
var closeDeviceEventHubClients = require('./testUtils.js').closeDeviceEventHubClients;
|
||||
|
||||
var hubConnectionString = process.env.IOTHUB_CONNECTION_STRING;
|
||||
|
||||
var EventHubReceiverHelper = function() {
|
||||
};
|
||||
util.inherits(EventHubReceiverHelper, EventEmitter);
|
||||
|
||||
EventHubReceiverHelper.prototype.openClient = function(done) {
|
||||
var self = this;
|
||||
this.ehClient = eventHubClient.fromConnectionString(hubConnectionString);
|
||||
this.ehReceivers = [];
|
||||
|
||||
this.ehClient.open()
|
||||
.then(self.ehClient.getPartitionIds.bind(self.ehClient))
|
||||
.then(function (partitionIds) {
|
||||
return partitionIds.map(function (partitionId) {
|
||||
return self.ehClient.createReceiver('$Default', partitionId,{ 'startAfterTime' : Date.now()}).then(function(receiver) {
|
||||
self.ehReceivers.push(receiver);
|
||||
receiver.on('errorReceived', function(err) {
|
||||
self.emit(err);
|
||||
});
|
||||
receiver.on('message', function (eventData) {
|
||||
self.emit('message', eventData);
|
||||
});
|
||||
});
|
||||
});
|
||||
})
|
||||
.catch(function(err) {
|
||||
done(err);
|
||||
})
|
||||
.then(function() {
|
||||
done();
|
||||
});
|
||||
};
|
||||
|
||||
EventHubReceiverHelper.prototype.closeClient = function(done) {
|
||||
closeDeviceEventHubClients(null, this.ehClient, this.ehReceivers, done);
|
||||
};
|
||||
|
||||
module.exports = EventHubReceiverHelper;
|
|
@ -8,21 +8,11 @@ var uuid = require('uuid');
|
|||
var DeviceIdentityHelper = require('./device_identity_helper.js');
|
||||
var async = require('async');
|
||||
var assert = require('chai').assert;
|
||||
var debug = require('debug')('e2etests:modules');
|
||||
var debug = require('debug')('e2etests:module_crud');
|
||||
|
||||
var hubConnectionString = process.env.IOTHUB_CONNECTION_STRING;
|
||||
var registry = Registry.fromConnectionString(hubConnectionString);
|
||||
|
||||
// MISSING SCENARIOS
|
||||
//
|
||||
// The following scenarios are missing because modules, module twins, and
|
||||
// module methods are not yet supported for devices on IoT Hub
|
||||
//
|
||||
// 1. create a module, get the connection string, and connect using the device API.
|
||||
// 2. set a module twin and verify desired properties from device API.
|
||||
// 3. set reported properties from device api and verify from service api.
|
||||
// 4. invoke a device method and verify using device api
|
||||
//
|
||||
describe('modules', function() {
|
||||
var module;
|
||||
var deviceId = null;
|
||||
|
@ -174,7 +164,7 @@ describe('modules', function() {
|
|||
});
|
||||
|
||||
// Skipped because of failure
|
||||
it.skip ('can add and remove a module', function(done) {
|
||||
it ('can add and remove a module', function(done) {
|
||||
async.series([
|
||||
function addModule(callback) {
|
||||
debug('adding module with deviceId = ' + module.deviceId + ' and moduleId ' + module.moduleId);
|
|
@ -0,0 +1,130 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
|
||||
var uuid = require('uuid');
|
||||
var debug = require('debug')('e2etests:module-identity-helper');
|
||||
var async = require('async');
|
||||
|
||||
var Registry = require('azure-iothub').Registry;
|
||||
var ConnectionString = require('azure-iot-common').ConnectionString;
|
||||
var DeviceClient = require('azure-iot-device').Client;
|
||||
var ServiceClient = require('azure-iothub').Client;
|
||||
var DeviceIdentityHelper = require('./device_identity_helper.js');
|
||||
|
||||
var hubConnectionString = process.env.IOTHUB_CONNECTION_STRING;
|
||||
var registry = Registry.fromConnectionString(hubConnectionString);
|
||||
var serviceClient = ServiceClient.fromConnectionString(hubConnectionString);
|
||||
|
||||
module.exports.createModule = function(testModule, Transport, done) {
|
||||
async.series([
|
||||
function createDeviceIfNecessary(done) {
|
||||
if (testModule.testDevice) {
|
||||
done();
|
||||
} else {
|
||||
testModule.registry = registry;
|
||||
testModule.serviceClient = serviceClient;
|
||||
|
||||
debug('Creating SAS device to use with test');
|
||||
DeviceIdentityHelper.createDeviceWithSas(function (err, testDevice) {
|
||||
debug('createDeviceWithSas returned ' + (err ? err : 'success'));
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
testModule.testDevice = testDevice;
|
||||
testModule.deviceId = testDevice.deviceId;
|
||||
done();
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
function createModule(done) {
|
||||
testModule.moduleId = 'node_e2e_' + uuid.v4();
|
||||
debug('creating module with id ' + testModule.moduleId);
|
||||
registry.addModule({deviceId: testModule.deviceId, moduleId: testModule.moduleId}, function(err) {
|
||||
debug('addModule returned ' + (err ? err : 'success'));
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
debug('getting module with deviceId = ' + testModule.deviceId + ' and moduleId ' + testModule.moduleId);
|
||||
registry.getModule(testModule.deviceId, testModule.moduleId, function(err, foundModule) {
|
||||
debug('getModule returned ' + (err ? err : 'success'));
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
var hubName = ConnectionString.parse(hubConnectionString).HostName;
|
||||
testModule.moduleConnectionString = 'HostName=' + hubName + ';DeviceId=' + foundModule.deviceId + ';ModuleId='+foundModule.moduleId+';SharedAccessKey=' + foundModule.authentication.symmetricKey.primaryKey;
|
||||
done();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
function connectDeviceClient(done) {
|
||||
testModule.deviceClient = DeviceClient.fromConnectionString(testModule.moduleConnectionString, Transport);
|
||||
debug('opening device client');
|
||||
testModule.deviceClient.open(function(err) {
|
||||
debug('deviceClient.open returned ' + (err ? err : 'success'));
|
||||
done(err);
|
||||
});
|
||||
}
|
||||
], done);
|
||||
};
|
||||
|
||||
module.exports.createModuleTwinObjects = function(testModule, done) {
|
||||
async.series([
|
||||
function createServiceTwin(done) {
|
||||
debug('creating service twin');
|
||||
registry.getModuleTwin(testModule.deviceId, testModule.moduleId, function(err, twin) {
|
||||
debug('getModuleTwin returned ' + (err ? err : 'success'));
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
testModule.serviceTwin = twin;
|
||||
done();
|
||||
}
|
||||
});
|
||||
},
|
||||
function createDeviceTwin(done) {
|
||||
debug('getting device twin');
|
||||
testModule.deviceClient.getTwin(function(err, twin) {
|
||||
debug('getTwin returned ' + (err ? err : 'success'));
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
testModule.deviceTwin = twin;
|
||||
done();
|
||||
}
|
||||
});
|
||||
}
|
||||
], done);
|
||||
};
|
||||
|
||||
module.exports.cleanUpAfterTest = function(testModule, done) {
|
||||
async.series([
|
||||
function closeDeviceClient(done) {
|
||||
if (testModule.deviceClient) {
|
||||
debug('closing device client');
|
||||
testModule.deviceClient.close(function(err) {
|
||||
debug('deviceClient.close returned ' + (err ? err : 'success'));
|
||||
done(err);
|
||||
});
|
||||
} else {
|
||||
done();
|
||||
}
|
||||
},
|
||||
function removeDeviceFromRegistry(done) {
|
||||
if (testModule.testDevice && testModule.testDevice.deviceId) {
|
||||
debug('deleting device with deviceId ' + testModule.testDevice.deviceId);
|
||||
DeviceIdentityHelper.deleteDevice(testModule.testDevice.deviceId, function(err) {
|
||||
debug('deleteDevice returned ' + (err ? err : 'success'));
|
||||
testModule.testDevice.deviceId = null;
|
||||
done(err);
|
||||
});
|
||||
} else {
|
||||
done();
|
||||
}
|
||||
}
|
||||
], done);
|
||||
};
|
|
@ -0,0 +1,78 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
|
||||
var ModuleIdentityHelper = require('./module_identity_helper.js');
|
||||
var EventHubReceiverHelper = require('./eventhub_receiver_helper');
|
||||
var Message = require('azure-iot-common').Message;
|
||||
var assert = require('chai').assert;
|
||||
var debug = require('debug')('e2etests:module-messaging');
|
||||
var Amqp = require('azure-iot-device-amqp').Amqp;
|
||||
var AmqpWs = require('azure-iot-device-amqp').AmqpWs;
|
||||
//var Mqtt = require('azure-iot-device-mqtt').Mqtt;
|
||||
//var MqttWs = require('azure-iot-device-mqtt').MqttWs;
|
||||
|
||||
var transportsToTest = [ Amqp, AmqpWs ];
|
||||
|
||||
describe('module messaging', function() {
|
||||
this.timeout(46000);
|
||||
|
||||
transportsToTest.forEach(function(Transport) {
|
||||
describe('using ' + Transport.name, function() {
|
||||
var testModule = {};
|
||||
|
||||
before(function(done) {
|
||||
ModuleIdentityHelper.createModule(testModule, Transport, done);
|
||||
});
|
||||
|
||||
after(function(done) {
|
||||
ModuleIdentityHelper.cleanUpAfterTest(testModule, done);
|
||||
});
|
||||
|
||||
it ('Can send from service to module input', function(done) {
|
||||
var testInputName = '__input__';
|
||||
var testMessageText = '__message__';
|
||||
|
||||
testModule.deviceClient.on('inputMessage', function (inputName, msg) {
|
||||
assert.strictEqual(inputName, testInputName);
|
||||
assert.strictEqual(msg.getBytes().toString('ascii'), testMessageText);
|
||||
done();
|
||||
});
|
||||
|
||||
var messageToSend = new Message(testMessageText);
|
||||
debug('sending message to input named ' + testInputName);
|
||||
testModule.serviceClient.sendToModule(testModule.deviceId, testModule.moduleId, testInputName, messageToSend, function(err) {
|
||||
debug('sendToModule returned ' + (err ? err : 'success'));
|
||||
});
|
||||
});
|
||||
|
||||
it ('Can send from module output to service', function(done) {
|
||||
var testOutputName = '__output__';
|
||||
var testOutputText = '__test_output_text__';
|
||||
|
||||
var ehReceiver = new EventHubReceiverHelper();
|
||||
after(function(done) {
|
||||
ehReceiver.closeClient(done);
|
||||
});
|
||||
ehReceiver.openClient(function(err) {
|
||||
if (err) {
|
||||
done (err);
|
||||
} else {
|
||||
ehReceiver.on('message', function(msg) {
|
||||
if (msg.properties.to === '/devices/' + testModule.deviceId + '/modules/' + testModule.moduleId + '/messages/events') {
|
||||
assert.strictEqual(msg.annotations['x-opt-output-name'], testOutputName);
|
||||
assert.strictEqual(msg.body.toString('ascii'), testOutputText);
|
||||
done();
|
||||
}
|
||||
});
|
||||
debug('sending message to output named ' + testOutputName);
|
||||
testModule.deviceClient.sendOutputEvent(testOutputName, new Message(testOutputText), function(err) {
|
||||
debug('sendOutputEvent returned ' + (err ? err : 'success'));
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,75 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
|
||||
var ModuleIdentityHelper = require('./module_identity_helper.js');
|
||||
var assert = require('chai').assert;
|
||||
var debug = require('debug')('e2etests:module-methods');
|
||||
var Amqp = require('azure-iot-device-amqp').Amqp;
|
||||
var AmqpWs = require('azure-iot-device-amqp').AmqpWs;
|
||||
//var Mqtt = require('azure-iot-device-mqtt').Mqtt;
|
||||
//var MqttWs = require('azure-iot-device-mqtt').MqttWs;
|
||||
|
||||
var transportsToTest = [ Amqp, AmqpWs ];
|
||||
|
||||
describe('module methods', function() {
|
||||
this.timeout(46000);
|
||||
|
||||
transportsToTest.forEach(function(Transport) {
|
||||
describe('using ' + Transport.name, function() {
|
||||
var testModule = {};
|
||||
|
||||
before(function(done) {
|
||||
ModuleIdentityHelper.createModule(testModule, Transport, done);
|
||||
});
|
||||
|
||||
after(function(done) {
|
||||
ModuleIdentityHelper.cleanUpAfterTest(testModule, done);
|
||||
});
|
||||
|
||||
it ('can receive a method call', function(done) {
|
||||
var methodName = 'my_method';
|
||||
var requestPayload = {
|
||||
fakePayloadKey: '__FAKE_PAYLOAD_VALUE__'
|
||||
};
|
||||
var methodResult = 400;
|
||||
var responsePayload = {
|
||||
anotherFakePayloadKey: '__ANOTHER_FAKE_PAYLOAD_VALUE__'
|
||||
};
|
||||
var methodParams = {
|
||||
methodName: methodName,
|
||||
payload: requestPayload,
|
||||
responseTimeoutInSeconds: 15
|
||||
};
|
||||
|
||||
testModule.deviceClient.onDeviceMethod(methodName, function(request, response) {
|
||||
debug('received method call');
|
||||
assert.strictEqual(request.methodName, methodName);
|
||||
assert.deepEqual(request.payload, requestPayload);
|
||||
|
||||
debug('sending method response with statusCode: ' + methodResult);
|
||||
response.send(methodResult, responsePayload, function(err) {
|
||||
debug('response.send returned ' + (err ? err : 'success'));
|
||||
assert(!err);
|
||||
});
|
||||
});
|
||||
|
||||
// Waiting for an arbitrary 2 seconds because we don't know when all the links above have been established.
|
||||
setTimeout(function() {
|
||||
debug('invoking method');
|
||||
testModule.serviceClient.invokeModuleMethod(testModule.deviceId, testModule.moduleId, methodParams, function(err, response) {
|
||||
debug('invokeModuleMethod returned ' + (err ? err : 'success'));
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
assert.strictEqual(response.status, methodResult);
|
||||
assert.deepEqual(response.payload, responsePayload);
|
||||
done();
|
||||
}
|
||||
});
|
||||
}, 2000);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,84 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
|
||||
var ModuleIdentityHelper = require('./module_identity_helper.js');
|
||||
var assert = require('chai').assert;
|
||||
var debug = require('debug')('e2etests:module-twin');
|
||||
var Amqp = require('azure-iot-device-amqp').Amqp;
|
||||
var AmqpWs = require('azure-iot-device-amqp').AmqpWs;
|
||||
//var Mqtt = require('azure-iot-device-mqtt').Mqtt;
|
||||
//var MqttWs = require('azure-iot-device-mqtt').MqttWs;
|
||||
|
||||
var transportsToTest = [ Amqp, AmqpWs ];
|
||||
|
||||
describe('module twin', function() {
|
||||
this.timeout(46000);
|
||||
|
||||
transportsToTest.forEach(function(Transport) {
|
||||
describe('using ' + Transport.name, function() {
|
||||
var testModule = {};
|
||||
|
||||
before(function(done) {
|
||||
ModuleIdentityHelper.createModule(testModule, Transport, function(err) {
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
ModuleIdentityHelper.createModuleTwinObjects(testModule, done);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
after(function(done) {
|
||||
ModuleIdentityHelper.cleanUpAfterTest(testModule, done);
|
||||
});
|
||||
|
||||
it ('can receive desired property changes', function(done) {
|
||||
var patch = {
|
||||
properties: {
|
||||
desired: {
|
||||
fake_key: '__FAKE_VALUE__'
|
||||
}
|
||||
}
|
||||
};
|
||||
testModule.deviceTwin.on('properties.desired', function(props) {
|
||||
debug('received properties:' + JSON.stringify(props));
|
||||
if (props.fake_key === patch.properties.desired.fake_key) {
|
||||
done();
|
||||
}
|
||||
});
|
||||
debug('sending desired properties');
|
||||
testModule.serviceTwin.update(patch, function(err) {
|
||||
debug('twin.update returned ' + (err ? err : 'success'));
|
||||
assert(!err);
|
||||
});
|
||||
});
|
||||
|
||||
it('can send reported properties', function(done) {
|
||||
var patch = {
|
||||
another_fake_key: '__ANOTHER_FAKE_VALUE__'
|
||||
};
|
||||
|
||||
debug('updating reported properties');
|
||||
testModule.deviceTwin.properties.reported.update(patch, function(err) {
|
||||
debug('twin.properties.reported.update returned ' + (err ? err : 'success'));
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
debug('getting service twin');
|
||||
testModule.serviceTwin.get(function(err, twin) {
|
||||
debug('serviceTwin.get returned ' + (err ? err : 'success'));
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
assert.strictEqual(twin.properties.reported.another_fake_key, patch.another_fake_key);
|
||||
done();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -67,13 +67,19 @@ function closeDeviceEventHubClients(deviceClient, eventHubClient, ehReceivers, d
|
|||
done(eventHubErr);
|
||||
}
|
||||
});
|
||||
deviceClient.close(function (err) {
|
||||
deviceErr = err || eventHubErr;
|
||||
deviceClient = null;
|
||||
if (deviceErr || !eventHubClient) {
|
||||
done(deviceErr);
|
||||
if (!deviceClient) {
|
||||
if (!eventHubClient) {
|
||||
done();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
deviceClient.close(function (err) {
|
||||
deviceErr = err || eventHubErr;
|
||||
deviceClient = null;
|
||||
if (deviceErr || !eventHubClient) {
|
||||
done(deviceErr);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
|
|
@ -107,6 +107,30 @@ The `send` method sends a cloud-to-device message to the service, intended for d
|
|||
|
||||
**SRS_NODE_IOTHUB_CLIENT_16_030: [** The `send` method shall not throw if the `done` callback is falsy. **]**
|
||||
|
||||
sendToModule(deviceId: string, moduleId: string, inputName: string, message: Message | Message.BufferConvertible, done?: Callback<results.MessageEnqueued>): void;
|
||||
The `sendToModule` method sends a message to the service, intended for delivery to the given input on the given module on the given device
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_007: [** The `sendToModule` method shall throw `ReferenceError` if the deviceId, moduleId, inputName, or message arguments are falsy. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_008: [** The `sendToModule` method shall convert the message object to type azure-iot-common.Message if necessary. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_009: [** When the `sendToModule` method completes, the callback function (indicated by the done - argument) shall be invoked with the following arguments:
|
||||
- `err` - standard JavaScript Error object (or subclass)
|
||||
- `result` - an implementation-specific response object returned by the underlying protocol, useful for logging and troubleshooting **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_010: [** The argument `err` passed to the callback `done` shall be `null` if the protocol operation was successful. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_011: [** Otherwise the argument `err` shall have an `amqpError` property containing implementation-specific response information for use in logging and troubleshooting. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_012: [** If the `deviceId` has not been registered with the IoT Hub, `sendToModule` shall call the `done` callback with a `DeviceNotFoundError`. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_013: [** If the queue which receives messages on behalf of the device is full, `sendToModule` shall call the `done` callback with a `DeviceMaximumQueueDepthExceededError`. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_014: [** The `sendToModule` method shall use the retry policy defined either by default or by a call to `setRetryPolicy` if necessary to send the message. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_CLIENT_18_015: [** The `sendToModule` method shall not throw if the `done` callback is falsy. . **]**
|
||||
|
||||
|
||||
###getFeedbackReceiver(done)
|
||||
The `getFeedbackReceiver` method is used to obtain an `AmqpReceiver` object which emits events when new feedback messages are received by the client.
|
||||
|
||||
|
|
|
@ -628,6 +628,7 @@ Request-Id: <guid>
|
|||
DELETE /devices/<encodeURIComponent(deviceId)>/modules/<encodeURIComponent(moduleId)>?api-version=<version> HTTP/1.1
|
||||
Authorization: <sharedAccessSignature>
|
||||
Request-Id: <guid>
|
||||
If-Match: "*"
|
||||
```
|
||||
**]**
|
||||
|
||||
|
|
|
@ -88,6 +88,27 @@ The `send` method sends an event to the device passed as argument, using the IoT
|
|||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_16_030: [** The `send` method shall call the `send` method of the C2D link and pass it the Amqp request that it created. **]**
|
||||
|
||||
### sendToModule(deviceId: string, moduleId: string, inputName: string, message: Message, done?: Callback<results.MessageEnqueued>): void;
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_001: [** The `sendToModule` method shall construct an AMQP request using the message passed in argument as the body of the message. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_002: [** The message generated by the `sendToModule` method should have its “to” field set to the `deviceId` & `moduleId` passed as arguments. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_009: [** The message generated by the `sendToModule` method should have its “x-opt-input-name” annotation set to the `inputName` passed a an argument. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_003: [** The `sendToModule` method shall connect and authenticate the transport if it is disconnected. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_004: [** The `sendToModule` method shall call its callback with an error if connecting and/or authenticating the transport fails. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_005: [** The `sendToModule` method shall attach the C2D link if necessary. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_006: [** The `sendToModule` method shall reuse the C2D link if it is already attached. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_007: [** The `sendToModule` method shall call its callback with an error if it fails to attach the C2D link. **]**
|
||||
|
||||
**SRS_NODE_IOTHUB_SERVICE_AMQP_18_008: [** The `sendToModule` method shall call the `send` method of the C2D link and pass it the Amqp request that it created. **]**
|
||||
|
||||
|
||||
### getFeedbackReceiver(done)
|
||||
Gets the `AmqpReceiver` object used to subscribe to feedback messages and errors sent by devices to this IoT Hub instance.
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@
|
|||
"alltest": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec test/_*_test*.js",
|
||||
"ci": "npm run typings && npm -s run lint && npm -s run build && npm -s run alltest-min && npm -s run check-cover",
|
||||
"test": "npm -s run lint && npm -s run build && npm -s run unittest",
|
||||
"check-cover": "istanbul check-coverage --statements 97 --branches 93 --lines 98 --functions 93",
|
||||
"check-cover": "istanbul check-coverage --statements 97 --branches 92 --lines 97 --functions 93",
|
||||
"cover": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec test/_*_test*.js"
|
||||
},
|
||||
"engines": {
|
||||
|
|
|
@ -120,14 +120,16 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
this._fsm.transition('connecting', callback);
|
||||
},
|
||||
disconnect: (callback) => callback(),
|
||||
send: (message, deviceEndpoint, callback) => {
|
||||
send: (amqpMessage, deviceEndpoint, callback) => {
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_025: [The `send` method shall connect and authenticate the transport if it is disconnected.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_003: [The `sendToModule` method shall connect and authenticate the transport if it is disconnected.]*/
|
||||
this._fsm.handle('connect', (err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_026: [The `send` method shall call its callback with an error if connecting and/or authenticating the transport fails.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_004: [The `sendToModule` method shall call its callback with an error if connecting and/or authenticating the transport fails.]*/
|
||||
callback(err);
|
||||
} else {
|
||||
this._fsm.handle('send', message, deviceEndpoint, callback);
|
||||
this._fsm.handle('send', amqpMessage, deviceEndpoint, callback);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
@ -232,26 +234,29 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
},
|
||||
connect: (callback) => callback(),
|
||||
disconnect: (callback) => this._fsm.transition('disconnecting', null, callback),
|
||||
send: (message, deviceEndpoint, callback) => {
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_002: [The `send` method shall construct an AMQP request using the message passed in argument as the body of the message.]*/
|
||||
let amqpMessage = AmqpMessage.fromMessage(message);
|
||||
send: (amqpMessage, deviceEndpoint, callback) => {
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_003: [The message generated by the `send` method should have its “to” field set to the device ID passed as an argument.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_002: [The message generated by the `sendToModule` method should have its “to” field set to the `deviceId` & `moduleId` passed as arguments.]*/
|
||||
amqpMessage.properties.to = deviceEndpoint;
|
||||
if (!this._c2dLink) {
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_027: [The `send` method shall attach the C2D link if necessary.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_005: [The `sendToModule` method shall attach the C2D link if necessary.]*/
|
||||
this._amqp.attachSenderLink(this._c2dEndpoint, null, (err, link) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_029: [The `send` method shall call its callback with an error if it fails to attach the C2D link.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_007: [The `sendToModule` method shall call its callback with an error if it fails to attach the C2D link.]*/
|
||||
callback(err);
|
||||
} else {
|
||||
this._c2dLink = link;
|
||||
this._c2dLink.on('error', this._c2dErrorListener);
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_030: [The `send` method shall call the `send` method of the C2D link and pass it the Amqp request that it created.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_008: [The `sendToModule` method shall call the `send` method of the C2D link and pass it the Amqp request that it created.]*/
|
||||
this._c2dLink.send(amqpMessage, callback);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_028: [The `send` method shall reuse the C2D link if it is already attached.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_006: [The `sendToModule` method shall reuse the C2D link if it is already attached.]*/
|
||||
this._c2dLink.send(amqpMessage, callback);
|
||||
}
|
||||
},
|
||||
|
@ -455,7 +460,25 @@ export class Amqp extends EventEmitter implements Client.Transport {
|
|||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_003: [The message generated by the send method should have its “to” field set to the device ID passed as an argument.]*/
|
||||
send(deviceId: string, message: Message, done: Client.Callback<results.MessageEnqueued>): void {
|
||||
const deviceEndpoint = endpoint.messagePath(encodeURIComponent(deviceId));
|
||||
this._fsm.handle('send', message, deviceEndpoint, handleResult('AMQP Transport: Could not send message', done));
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_002: [The `send` method shall construct an AMQP request using the message passed in argument as the body of the message.]*/
|
||||
let amqpMessage = AmqpMessage.fromMessage(message);
|
||||
this._fsm.handle('send', amqpMessage, deviceEndpoint, handleResult('AMQP Transport: Could not send message', done));
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_001: [The `sendToModule` method shall construct an AMQP request using the message passed in argument as the body of the message.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_002: [The message generated by the `sendToModule` method should have its “to” field set to the `deviceId` & `moduleId` passed as arguments.]*/
|
||||
sendToModule(deviceId: string, moduleId: string, inputName: string, message: Message, done?: Callback<results.MessageEnqueued>): void {
|
||||
const deviceEndpoint = endpoint.messagePath(encodeURIComponent(deviceId), encodeURIComponent(moduleId));
|
||||
let amqpMessage = AmqpMessage.fromMessage(message);
|
||||
if (!amqpMessage.messageAnnotations) {
|
||||
amqpMessage.messageAnnotations = {};
|
||||
}
|
||||
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_18_009: [The message generated by the `sendToModule` method should have its “x-opt-input-name” annotation set to the `inputName` passed a an argument.]*/
|
||||
amqpMessage.messageAnnotations['x-opt-input-name'] = inputName;
|
||||
this._fsm.handle('send', amqpMessage, deviceEndpoint, handleResult('AMQP Transport: Could not send message', done));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -166,6 +166,67 @@ export class Client extends EventEmitter {
|
|||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @method module:azure-iothub.Client#sendToModule
|
||||
* @description Sends a message to a device.
|
||||
* @param {String} deviceId The identifier of an existing device identity.
|
||||
* @param {String} moduleId The identifier of an existing module.
|
||||
* @param {String} inputName Name of the input that the message is destined for.
|
||||
* @param {Object} message The body of the message to send to the module.
|
||||
* If `message` is not of type
|
||||
* {@link module:azure-iot-common.Message|Message},
|
||||
* it will be converted.
|
||||
* @param {Function} done The function to call when the operation is
|
||||
* complete. `done` will be called with two
|
||||
* arguments: an Error object (can be null) and a
|
||||
* transport-specific response object useful for
|
||||
* logging or debugging.
|
||||
*
|
||||
* @throws {ReferenceError} If `deviceId`, `moduleId`, `inputName`, or `message` is null, undefined or empty.
|
||||
*/
|
||||
sendToModule(deviceId: string, moduleId: string, inputName: string, message: Message | Message.BufferConvertible, done?: Callback<results.MessageEnqueued>): void {
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_007: [The `sendToModule` method shall throw `ReferenceError` if the deviceId, moduleId, inputName, or message arguments are falsy.]*/
|
||||
if (!deviceId) {
|
||||
throw new ReferenceError('deviceId is \'' + deviceId + '\'');
|
||||
}
|
||||
if (!moduleId) {
|
||||
throw new ReferenceError('moduleId is \'' + moduleId + '\'');
|
||||
}
|
||||
if (!inputName) {
|
||||
throw new ReferenceError('inputName is \'' + inputName + '\'');
|
||||
}
|
||||
if (!message) {
|
||||
throw new ReferenceError('message is \'' + message + '\'');
|
||||
}
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_008: [The `sendToModule` method shall convert the message object to type azure-iot-common.Message if necessary.]*/
|
||||
if ((<any>message.constructor).name !== 'Message') {
|
||||
message = new Message(message as Message.BufferConvertible);
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_009: [When the `sendToModule` method completes, the callback function (indicated by the done - argument) shall be invoked with the following arguments:
|
||||
- `err` - standard JavaScript Error object (or subclass)
|
||||
- `result` - an implementation-specific response object returned by the underlying protocol, useful for logging and troubleshooting]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_010: [The argument `err` passed to the callback `done` shall be `null` if the protocol operation was successful.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_011: [Otherwise the argument `err` shall have an `amqpError` property containing implementation-specific response information for use in logging and troubleshooting.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_012: [If the `deviceId` has not been registered with the IoT Hub, `sendToModule` shall call the `done` callback with a `DeviceNotFoundError`.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_013: [If the queue which receives messages on behalf of the device is full, `sendToModule` shall call the `done` callback with a `DeviceMaximumQueueDepthExceededError`.]*/
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_014: [The `sendToModule` method shall use the retry policy defined either by default or by a call to `setRetryPolicy` if necessary to send the message.]*/
|
||||
const retryOp = new RetryOperation(this._retryPolicy, MAX_RETRY_TIMEOUT);
|
||||
retryOp.retry((retryCallback) => {
|
||||
this._transport.sendToModule(deviceId, moduleId, inputName, message as Message, retryCallback);
|
||||
}, (err, result) => {
|
||||
/*Codes_SRS_NODE_IOTHUB_CLIENT_18_015: [The `sendToModule` method shall not throw if the `done` callback is falsy. .]*/
|
||||
if (done) {
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
done(null, result);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @method module:azure-iothub.Client#invokeDeviceMethod
|
||||
* @description Invokes a method on a particular device.
|
||||
|
@ -446,6 +507,7 @@ export namespace Client {
|
|||
connect(done?: Callback<results.Connected>): void;
|
||||
disconnect(done: Callback<results.Disconnected>): void;
|
||||
send(deviceId: string, message: Message, done?: Callback<results.MessageEnqueued>): void;
|
||||
sendToModule(deviceId: string, moduleId: string, inputName: string, message: Message, done?: Callback<results.MessageEnqueued>): void;
|
||||
getFeedbackReceiver(done: Callback<ServiceReceiver>): void;
|
||||
getFileNotificationReceiver(done: Callback<ServiceReceiver>): void;
|
||||
}
|
||||
|
|
|
@ -1072,11 +1072,16 @@ export class Registry {
|
|||
DELETE /devices/<encodeURIComponent(deviceId)>/modules/<encodeURIComponent(moduleId)>?api-version=<version> HTTP/1.1
|
||||
Authorization: <sharedAccessSignature>
|
||||
Request-Id: <guid>
|
||||
If-Match: "*"
|
||||
```
|
||||
]*/
|
||||
const httpHeaders = {
|
||||
'If-Match': '"*"'
|
||||
};
|
||||
|
||||
const path = `/devices/${encodeURIComponent(deviceId)}/modules/${encodeURIComponent(moduleId)}${endpoint.versionQueryString()}`;
|
||||
|
||||
this._restApiClient.executeApiCall('DELETE', path, null, null, done);
|
||||
this._restApiClient.executeApiCall('DELETE', path, httpHeaders, null, done);
|
||||
}
|
||||
|
||||
private _bulkOperation(devices: Registry.DeviceDescription[], done: Callback<any>): void {
|
||||
|
|
|
@ -529,120 +529,144 @@ describe('Amqp', function() {
|
|||
});
|
||||
});
|
||||
|
||||
describe('#send', function () {
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_025: [The `send` method shall connect and authenticate the transport if it is disconnected.]*/
|
||||
it('connects the transport if necessary', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, null, fakeSender);
|
||||
[
|
||||
{
|
||||
functionUnderTest: 'send',
|
||||
invokeFunctionUnderTest: function(amqp, msg, callback) { amqp.send('fakeDeviceId', msg, callback); },
|
||||
expectedDestination: '/devices/fakeDeviceId/messages/devicebound',
|
||||
expectedInputName: null
|
||||
},
|
||||
{
|
||||
functionUnderTest: 'sendToModule',
|
||||
invokeFunctionUnderTest: function(amqp, msg, callback) { amqp.sendToModule('fakeDeviceId', 'fakeModuleId', 'fakeInputName', msg, callback); },
|
||||
expectedDestination: '/devices/fakeDeviceId/modules/fakeModuleId/messages/devicebound',
|
||||
expectedInputName: 'fakeInputName'
|
||||
}
|
||||
].forEach(function(testConfig) {
|
||||
describe('#' + testConfig.functionUnderTest, function () {
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_025: [The `send` method shall connect and authenticate the transport if it is disconnected.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_18_003: [The `sendToModule` method shall connect and authenticate the transport if it is disconnected.]*/
|
||||
it('connects the transport if necessary', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, null, fakeSender);
|
||||
|
||||
amqp.send('fakeDeviceId', new Message('foo'), function () {
|
||||
assert.isTrue(fakeAmqpBase.connect.calledOnce);
|
||||
assert.isTrue(fakeAmqpBase.initializeCBS.calledOnce);
|
||||
assert.isTrue(fakeAmqpBase.putToken.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_026: [The `send` method shall call its callback with an error if connecting and/or authenticating the transport fails.]*/
|
||||
it('calls its callback with an error if connecting the transport fails', function (testCallback) {
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.connect = sinon.stub().callsArgWith(2, new Error('fakeError'));
|
||||
|
||||
amqp.send('fakeDeviceId', new Message('foo'), function (err) {
|
||||
assert.isTrue(fakeAmqpBase.connect.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_027: [The `send` method shall attach the C2D link if necessary.]*/
|
||||
it('attaches the link if necessary', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, fakeSender);
|
||||
|
||||
amqp.connect(function () {
|
||||
amqp.send('fakeDeviceId', new Message('foo'), function () {
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledOnce);
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledWith('/messages/devicebound'));
|
||||
testConfig.invokeFunctionUnderTest(amqp, new Message('foo'), function () {
|
||||
assert.isTrue(fakeAmqpBase.connect.calledOnce);
|
||||
assert.isTrue(fakeAmqpBase.initializeCBS.calledOnce);
|
||||
assert.isTrue(fakeAmqpBase.putToken.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_029: [The `send` method shall call its callback with an error if it fails to attach the C2D link.]*/
|
||||
it('calls its callback with an error if the link fails to attach', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, new Error('fake error'));
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_026: [The `send` method shall call its callback with an error if connecting and/or authenticating the transport fails.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_18_004: [The `sendToModule` method shall call its callback with an error if connecting and/or authenticating the transport fails.]*/
|
||||
it('calls its callback with an error if connecting the transport fails', function (testCallback) {
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.connect = sinon.stub().callsArgWith(2, new Error('fakeError'));
|
||||
|
||||
amqp.connect(function () {
|
||||
amqp.send('fakeDeviceId', new Message('foo'), function (err) {
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledOnce);
|
||||
testConfig.invokeFunctionUnderTest(amqp, new Message('foo'), function (err) {
|
||||
assert.isTrue(fakeAmqpBase.connect.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_030: [The `send` method shall call the `send` method of the C2D link and pass it the Amqp request that it created.]*/
|
||||
it('calls send on the c2d link object', function (testCallback) {
|
||||
var fakeDeviceId = 'fakeDeviceId';
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, null, fakeSender);
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_027: [The `send` method shall attach the C2D link if necessary.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_18_005: [The `sendToModule` method shall attach the C2D link if necessary.]*/
|
||||
it('attaches the link if necessary', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, fakeSender);
|
||||
|
||||
amqp.connect(function () {
|
||||
amqp.send(fakeDeviceId, new Message('foo'), function () {
|
||||
assert.isTrue(fakeSender.send.calledOnce);
|
||||
assert.instanceOf(fakeSender.send.firstCall.args[0], AmqpMessage);
|
||||
assert.strictEqual(fakeSender.send.firstCall.args[0].properties.to, '/devices/' + fakeDeviceId + '/messages/devicebound');
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_017: [All asynchronous instance methods shall call the `done` callback with a single parameter that is derived from the standard Javascript `Error` object if the operation failed.]*/
|
||||
it('calls its callback with an error if the link fails to send the message', function (testCallback) {
|
||||
var fakeDeviceId = 'fakeDeviceId';
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, new Error('fake'));
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, null, fakeSender);
|
||||
|
||||
amqp.connect(function () {
|
||||
amqp.send(fakeDeviceId, new Message('foo'), function (err) {
|
||||
assert.isTrue(fakeSender.send.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_028: [The `send` method shall reuse the C2D link if it is already attached.]*/
|
||||
it('Reuses the c2d link object', function (testCallback) {
|
||||
var fakeDeviceId = 'fakeDeviceId';
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, null, fakeSender);
|
||||
|
||||
amqp.connect(function () {
|
||||
amqp.send(fakeDeviceId, new Message('test1'), function () {
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledOnce);
|
||||
assert.isTrue(fakeSender.send.calledOnce);
|
||||
amqp.send(fakeDeviceId, new Message('test2'), function () {
|
||||
amqp.connect(function () {
|
||||
testConfig.invokeFunctionUnderTest(amqp, new Message('foo'), function () {
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledOnce);
|
||||
assert.isTrue(fakeSender.send.calledTwice);
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledWith('/messages/devicebound'));
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_029: [The `send` method shall call its callback with an error if it fails to attach the C2D link.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_18_007: [The `sendToModule` method shall call its callback with an error if it fails to attach the C2D link.]*/
|
||||
it('calls its callback with an error if the link fails to attach', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, new Error('fake error'));
|
||||
|
||||
amqp.connect(function () {
|
||||
testConfig.invokeFunctionUnderTest(amqp, new Message('foo'), function (err) {
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_030: [The `send` method shall call the `send` method of the C2D link and pass it the Amqp request that it created.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_18_008: [The `sendToModule` method shall call the `send` method of the C2D link and pass it the Amqp request that it created.]*/
|
||||
it('calls send on the c2d link object', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, null, fakeSender);
|
||||
|
||||
amqp.connect(function () {
|
||||
testConfig.invokeFunctionUnderTest(amqp, new Message('foo'), function () {
|
||||
assert.isTrue(fakeSender.send.calledOnce);
|
||||
assert.instanceOf(fakeSender.send.firstCall.args[0], AmqpMessage);
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_003: [The message generated by the `send` method should have its “to” field set to the device ID passed as an argument.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_18_002: [The message generated by the `sendToModule` method should have its “to” field set to the `deviceId` & `moduleId` passed as arguments.]*/
|
||||
assert.strictEqual(fakeSender.send.firstCall.args[0].properties.to, testConfig.expectedDestination);
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_18_009: [The message generated by the `sendToModule` method should have its “x-opt-input-name” annotation set to the `inputName` passed a an argument.]*/
|
||||
if (testConfig.expectedInputName) {
|
||||
assert.strictEqual(fakeSender.send.firstCall.args[0].messageAnnotations['x-opt-input-name'], testConfig.expectedInputName);
|
||||
}
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_017: [All asynchronous instance methods shall call the `done` callback with a single parameter that is derived from the standard Javascript `Error` object if the operation failed.]*/
|
||||
it('calls its callback with an error if the link fails to send the message', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, new Error('fake'));
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, null, fakeSender);
|
||||
|
||||
amqp.connect(function () {
|
||||
testConfig.invokeFunctionUnderTest(amqp, new Message('foo'), function (err) {
|
||||
assert.isTrue(fakeSender.send.calledOnce);
|
||||
assert.instanceOf(err, Error);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_16_028: [The `send` method shall reuse the C2D link if it is already attached.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_SERVICE_AMQP_18_006: [The `sendToModule` method shall reuse the C2D link if it is already attached.]*/
|
||||
it('Reuses the c2d link object', function (testCallback) {
|
||||
var fakeSender = new EventEmitter();
|
||||
fakeSender.send = sinon.stub().callsArgWith(1, null, new results.MessageEnqueued());
|
||||
var amqp = new Amqp(fakeConfig, fakeAmqpBase);
|
||||
fakeAmqpBase.attachSenderLink = sinon.stub().callsArgWith(2, null, fakeSender);
|
||||
|
||||
amqp.connect(function () {
|
||||
testConfig.invokeFunctionUnderTest(amqp, new Message('test1'), function () {
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledOnce);
|
||||
assert.isTrue(fakeSender.send.calledOnce);
|
||||
testConfig.invokeFunctionUnderTest(amqp, new Message('test2'), function () {
|
||||
assert.isTrue(fakeAmqpBase.attachSenderLink.calledOnce);
|
||||
assert.isTrue(fakeSender.send.calledTwice);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -88,12 +88,53 @@ function transportSpecificTests(opts) {
|
|||
});
|
||||
});
|
||||
|
||||
describe('#sendToModule', function () {
|
||||
function createTestMessage() {
|
||||
var msg = new Message('msg');
|
||||
msg.expiryTimeUtc = Date.now() + 5000; // Expire 5s from now, to reduce the chance of us hitting the 50-message limit on the IoT Hub
|
||||
return msg;
|
||||
}
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_009: [When the `sendToModule` method completes, the callback function (indicated by the done - argument) shall be invoked with the following arguments:
|
||||
- `err` - standard JavaScript Error object (or subclass)
|
||||
- `result` - an implementation-specific response object returned by the underlying protocol, useful for logging and troubleshooting]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_010: [The argument `err` passed to the callback `done` shall be `null` if the protocol operation was successful.]*/
|
||||
it('sends the message', function (done) {
|
||||
testSubject.send(deviceId, createTestMessage(), function (err, state) {
|
||||
if (!err) {
|
||||
assert.equal(state.constructor.name, "MessageEnqueued");
|
||||
}
|
||||
done(err);
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_008: [The `sendToModule` method shall convert the message object to type azure-iot-common.Message if necessary.]*/
|
||||
it('accepts any message that is convertible to type Message', function (done) {
|
||||
var message = 'msg';
|
||||
testSubject.send(deviceId, message, function (err, state) {
|
||||
if (!err) {
|
||||
assert.equal(state.constructor.name, "MessageEnqueued");
|
||||
}
|
||||
done(err);
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_012: [If the `deviceId` has not been registered with the IoT Hub, `sendToModule` shall call the `done` callback with a `DeviceNotFoundError`.]*/
|
||||
it('returns DeviceNotFoundError when sending to an unregistered deviceId', function (done) {
|
||||
var unregisteredDeviceId = 'no-device' + Math.random();
|
||||
testSubject.send(unregisteredDeviceId, new Message('msg'), function (err) {
|
||||
assert.instanceOf(err, errors.DeviceNotFoundError);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('#getFeedbackReceiver', function () {
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_05_027: [When the `getFeedbackReceiver` method completes, the callback function (indicated by the `done` argument) shall be invoked with the following arguments:
|
||||
- `err` - standard JavaScript `Error` object (or subclass): `null` if the operation was successful
|
||||
- `receiver` - an `AmqpReceiver` instance: `undefined` if the operation failed]*/
|
||||
it('calls the `done` callback with a null error object and an AmqpReceiver object if the operation succeeds', function (done) {
|
||||
testSubject.getFeedbackReceiver(function (err, receiver) {
|
||||
testSubject.getFeedbackReceiver(function (err) {
|
||||
if (err) done(err);
|
||||
else {
|
||||
done();
|
||||
|
@ -107,7 +148,7 @@ function transportSpecificTests(opts) {
|
|||
- `err` - standard JavaScript `Error` object (or subclass): `null` if the operation was successful
|
||||
- `receiver` - an `AmqpReceiver` instance: `undefined` if the operation failed]*/
|
||||
it('calls the `done` callback with a null error object and an AmqpReceiver object if the operation succeeds', function (done) {
|
||||
testSubject.getFileNotificationReceiver(function (err, receiver) {
|
||||
testSubject.getFileNotificationReceiver(function (err) {
|
||||
if (err) done(err);
|
||||
else {
|
||||
done();
|
||||
|
|
|
@ -130,6 +130,41 @@ describe('Client', function () {
|
|||
});
|
||||
});
|
||||
|
||||
describe('#sendToModule', function () {
|
||||
var testSubject;
|
||||
|
||||
beforeEach('prepare test subject', function () {
|
||||
testSubject = new Client({}, {});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_007: [The `sendToModule` method shall throw `ReferenceError` if the deviceId, moduleId, inputName, or message arguments are falsy.]*/
|
||||
['deviceId', 'moduleId', 'inputName', 'message'].forEach(function(paramToTest) {
|
||||
[ null, '', undefined, ].forEach(function(falsyValue) {
|
||||
it('throws if ' + paramToTest + ' is ' + falsyValue, function() {
|
||||
var params = {
|
||||
deviceId: '__FAKE_DEVICE_ID__',
|
||||
moduleId: '__FAKE_MODULE_ID__',
|
||||
inputName: '__FAKE_INPUT_NAME__',
|
||||
message: new Message('msg')
|
||||
};
|
||||
params[paramToTest] = falsyValue;
|
||||
assert.throws(function() {
|
||||
testSubject.sendToModule(params.deviceId, params.moduleId, params.inputName, params.messageName);
|
||||
}, ReferenceError);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_015: [The `sendToModule` method shall not throw if the `done` callback is falsy. .]*/
|
||||
it('does not throw if done is falsy', function () {
|
||||
var simulatedAmqp = new SimulatedAmqp();
|
||||
var client = new Client(simulatedAmqp);
|
||||
assert.doesNotThrow(function () {
|
||||
client.sendToModule('deviceId', 'moduleId', 'inputName', new Message('msg'));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('#invokeDeviceMethod', function() {
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_16_014: [The `invokeDeviceMethod` method shall throw a `ReferenceError` if `deviceId` is `null`, `undefined` or an empty string.]*/
|
||||
[undefined, null, ''].forEach(function(badDeviceId) {
|
||||
|
@ -191,7 +226,7 @@ describe('Client', function () {
|
|||
describe('#' + testConfig.name, function() {
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_16_009: [The `invokeDeviceMethod` method shall initialize a new instance of `DeviceMethod` with the `methodName` and `timeout` values passed in the arguments.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_16_010: [The `invokeDeviceMethod` method shall use the newly created instance of `DeviceMethod` to invoke the method with the `payload` argument on the device specified with the `deviceid` argument .]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_16_013: [The `invokeDeviceMethod` method shall call the `done` callback with a `null` first argument, the result of the method execution in the second argument, and the transport-specific response object as a third argument.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_16_013: [The `invokeDeviceMethod` method sendthe `done` callback with a `null` first argument, the result of the method execution in the second argument, and the transport-specific response object as a third argument.]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_002: [The `invokeModuleMethod` method shall initialize a new `DeviceMethod` instance with `methodParams` values passed in the arguments. ]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_003: [The `invokeModuleMethod` method shall call `invokeOnModule` on the new `DeviceMethod` instance. ]*/
|
||||
/*Tests_SRS_NODE_IOTHUB_CLIENT_18_005: [The `invokeModuleMethod` method shall call the `done` callback with a `null` first argument, the result of the method execution in the second argument, and the transport-specific response object as a third argument. ]*/
|
||||
|
|
|
@ -1561,6 +1561,7 @@ describe('Registry', function() {
|
|||
DELETE /devices/<encodeURIComponent(deviceId)>/modules/<encodeURIComponent(moduleId)>?api-version=<version> HTTP/1.1
|
||||
Authorization: <sharedAccessSignature>
|
||||
Request-Id: <guid>
|
||||
If-Match: "*"
|
||||
```
|
||||
]*/
|
||||
it('constructs a valid HTTP request', function(testCallback) {
|
||||
|
@ -1568,6 +1569,7 @@ describe('Registry', function() {
|
|||
executeApiCall: function (method, path, httpHeaders, body, done) {
|
||||
assert.strictEqual(method, 'DELETE');
|
||||
assert.strictEqual(path, '/devices/deviceId/modules/moduleId' + endpoint.versionQueryString());
|
||||
assert.strictEqual(httpHeaders['If-Match'], '"*"');
|
||||
done();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -47,6 +47,26 @@ SimulatedAmqp.prototype.send = function send(deviceId, message, done) {
|
|||
}
|
||||
};
|
||||
|
||||
SimulatedAmqp.prototype.sendToModule = function send(deviceId, moduleId, inputName, message, done) {
|
||||
if (done) {
|
||||
if (deviceId.search(/^no-device/) !== -1) {
|
||||
done(new errors.DeviceNotFoundError());
|
||||
}
|
||||
else {
|
||||
done(null, new results.MessageEnqueued());
|
||||
if (message.ack === 'full') {
|
||||
this._receiver.emit('message', {
|
||||
body: [{
|
||||
originalMessageId: message.messageId,
|
||||
deviceId: deviceId,
|
||||
moduleId: moduleId
|
||||
}]
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
SimulatedAmqp.prototype.getFeedbackReceiver = function (done) {
|
||||
if (this._config.sharedAccessSignature === 'fail') {
|
||||
done(new Error('error'));
|
||||
|
|
Загрузка…
Ссылка в новой задаче