Use inheritance instead of composition for clients
Adjust requirements and tests make IOTEDGE_AUTHSCHEME case insensitive
This commit is contained in:
Родитель
e290bcd588
Коммит
21a130fb1d
|
@ -0,0 +1,75 @@
|
|||
# Client requirements (Device client)
|
||||
|
||||
The `Client` class is the historical client object that is used by devices to connect to IoT Hub and Edge Hub. Modules use the `ModuleClient` class.
|
||||
|
||||
## Overview
|
||||
|
||||
```typescript
|
||||
class Client extends InternalClient {
|
||||
uploadToBlob(blobName: string, stream: Stream, streamLength: number, done: (err?: Error) => void): void;
|
||||
on(type = 'message', msgHandler: (msg: Message) => void): void;
|
||||
|
||||
static fromConnectionString(connStr: string, transportCtor: any): any;
|
||||
static fromSharedAccessSignature(sharedAccessSignature: string, transportCtor: any): any;
|
||||
static fromAuthenticationProvider(authenticationProvider: AuthenticationProvider, transportCtor: any): any;
|
||||
}
|
||||
```
|
||||
|
||||
## Public API
|
||||
|
||||
### fromConnectionString
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_05_003: [** The `fromConnectionString` method shall throw ReferenceError if the connStr argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_05_006: [** The `fromConnectionString` method shall return a new instance of the `Client` object, as by a call to `new Client(new Transport(...))`. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_087: [** The `fromConnectionString` method shall create a new `SharedAccessKeyAuthorizationProvider` object with the connection string passed as argument if it contains a SharedAccessKey parameter and pass this object to the transport constructor. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_093: [** The `fromConnectionString` method shall create a new `X509AuthorizationProvider` object with the connection string passed as argument if it contains an X509 parameter and pass this object to the transport constructor. **]**
|
||||
|
||||
### fromSharedAccessSignature
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_029: [** The `fromSharedAccessSignature` method shall throw a `ReferenceError` if the sharedAccessSignature argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_030: [** The `fromSharedAccessSignature` method shall return a new instance of the `Client` object **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_088: [** The `fromSharedAccessSignature` method shall create a new `SharedAccessSignatureAuthorizationProvider` object with the shared access signature passed as argument, and pass this object to the transport constructor. **]**
|
||||
|
||||
### fromAuthenticationProvider
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_089: [** The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `authenticationProvider` argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_092: [** The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `transportCtor` argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_090: [** The `fromAuthenticationProvider` method shall pass the `authenticationProvider` object passed as argument to the transport constructor. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_091: [** The `fromAuthenticationProvider` method shall return a `Client` object configured with a new instance of a transport created using the `transportCtor` argument. **]**
|
||||
|
||||
### uploadToBlob(blobName, stream, done)
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_037: [** The `uploadToBlob` method shall throw a `ReferenceError` if `blobName` is falsy. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_038: [** The `uploadToBlob` method shall throw a `ReferenceError` if `stream` is falsy. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_039: [** The `uploadToBlob` method shall throw a `ReferenceError` if `streamLength` is falsy. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_040: [** The `uploadToBlob` method shall call the `done` callback with an `Error` object if the upload fails. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_041: [** The `uploadToBlob` method shall call the `done` callback no parameters if the upload succeeds. **]**
|
||||
|
||||
|
||||
### on('message', messageHandler)
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_002: [** The `message` event shall be emitted when a cloud-to-device message is received from the IoT Hub service. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_003: [** The `message` event parameter shall be a `message` object. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_004: [** The client shall start listening for messages from the service whenever there is a listener subscribed to the `message` event. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_005: [** The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `message` event. **]**
|
||||
|
||||
### on('disconnect', disconnectHandler)
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_102: [** If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object. **]**
|
||||
|
||||
**SRS_NODE_DEVICE_CLIENT_16_097: [** If the transport emits a `disconnect` event event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method. **]**
|
|
@ -5,34 +5,6 @@ azure-iot-device.InternalClient is an internal class which provides a means for
|
|||
|
||||
## Public Interface
|
||||
|
||||
### Factory methods
|
||||
#### fromConnectionString
|
||||
**SRS_NODE_INTERNAL_CLIENT_05_003: [** The `fromConnectionString` method shall throw ReferenceError if the connStr argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_05_006: [** The `fromConnectionString` method shall return a new instance of the `Client` object, as by a call to `new Client(new Transport(...))`. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_087: [** The `fromConnectionString` method shall create a new `SharedAccessKeyAuthorizationProvider` object with the connection string passed as argument if it contains a SharedAccessKey parameter and pass this object to the transport constructor. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_093: [** The `fromConnectionString` method shall create a new `X509AuthorizationProvider` object with the connection string passed as argument if it contains an X509 parameter and pass this object to the transport constructor. **]**
|
||||
|
||||
#### fromSharedAccessSignature
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_029: [** The `fromSharedAccessSignature` method shall throw a `ReferenceError` if the sharedAccessSignature argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_030: [** The `fromSharedAccessSignature` method shall return a new instance of the `Client` object **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_088: [** The `fromSharedAccessSignature` method shall create a new `SharedAccessSignatureAuthorizationProvider` object with the shared access signature passed as argument, and pass this object to the transport constructor. **]**
|
||||
|
||||
#### fromAuthenticationProvider
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_089: [** The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `authenticationProvider` argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_092: [** The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `transportCtor` argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_090: [** The `fromAuthenticationProvider` method shall pass the `authenticationProvider` object passed as argument to the transport constructor. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_091: [** The `fromAuthenticationProvider` method shall return a `Client` object configured with a new instance of a transport created using the `transportCtor` argument. **]**
|
||||
|
||||
### Constructors
|
||||
#### Client(transport, connectionString) constructor
|
||||
|
||||
|
@ -48,8 +20,6 @@ azure-iot-device.InternalClient is an internal class which provides a means for
|
|||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_045: [** If the transport successfully establishes a connection the `open` method shall subscribe to the `disconnect` event of the transport. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_020: [** The `open` function should start listening for C2D messages if there are listeners on the `message` event **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_064: [** The `open` method shall call the `openCallback` immediately with a null error object and a `results.Connected()` object if called while renewing the shared access signature. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_061: [** The `open` method shall not throw if the `openCallback` callback has not been provided. **]**
|
||||
|
@ -155,18 +125,6 @@ The `sendEventBatch` method sends a list of event messages to the IoT Hub as the
|
|||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_036: [** The `updateSharedAccessSignature` method shall call the `done` callback with a `null` error object and a result of type SharedAccessSignatureUpdated if the token was updated successfully. **]**
|
||||
|
||||
#### uploadToBlob(blobName, stream, done)
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_037: [** The `uploadToBlob` method shall throw a `ReferenceError` if `blobName` is falsy. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_038: [** The `uploadToBlob` method shall throw a `ReferenceError` if `stream` is falsy. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_039: [** The `uploadToBlob` method shall throw a `ReferenceError` if `streamLength` is falsy. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_040: [** The `uploadToBlob` method shall call the `done` callback with an `Error` object if the upload fails. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_041: [** The `uploadToBlob` method shall call the `done` callback no parameters if the upload succeeds. **]**
|
||||
|
||||
#### getTwin(done)
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_094: [** If this is the first call to `getTwin` the method shall instantiate a new `Twin` object and pass it the transport currently in use. **]**
|
||||
|
@ -225,53 +183,7 @@ interface DeviceMethodEventHandler {
|
|||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_086: [** Any operation (such as `sendEvent` or `onDeviceMethod`) happening after a `setRetryPolicy` call should use the policy set during that call. **]*
|
||||
|
||||
#### sendOutputEvent(outputName: string, message: Message, callback: (err?: Error, result?: results.MessageEnqueued) => void): void;
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_010: [** The `sendOutputEvent` method shall send the event indicated by the `message` argument via the transport associated with the Client instance. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_018: [** When the `sendOutputEvent` method completes, the `callback` function shall be invoked with the same arguments as the underlying transport method's callback. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_019: [** The `sendOutputEvent` method shall not throw if the `callback` is not passed. **]**
|
||||
|
||||
#### sendOutputEventBatch(outputName: string, messages: Message[], callback: (err?: Error, result?: results.MessageEnqueued) => void): void
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_011: [** The `sendOutputEventBatch` method shall send the list of events (indicated by the `messages` argument) via the transport associated with the Client instance. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_021: [** When the `sendOutputEventBatch` method completes the `callback` function shall be invoked with the same arguments as the underlying transport method's callback. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_022: [** The `sendOutputEventBatch` method shall not throw if the `callback` is not passed. **]**
|
||||
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_096: [** The `setRetryPolicy` method shall call the `setRetryPolicy` method on the twin if it is set and pass it the `policy` object. **]**
|
||||
|
||||
### Events
|
||||
#### message
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_002: [** The `message` event shall be emitted when a cloud-to-device message is received from the IoT Hub service. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_003: [** The `message` event parameter shall be a `message` object. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_004: [** The client shall start listening for messages from the service whenever there is a listener subscribed to the `message` event. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_005: [** The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `message` event. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_065: [** The client shall connect the transport if needed in order to receive messages. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_066: [** The client shall emit an error if connecting the transport fails while subscribing to message events **]**
|
||||
|
||||
#### inputMessage
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_012: [** The `inputMessage` event shall be emitted when an inputMessage is received from the IoT Hub service. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_013: [** The `inputMessage` event parameters shall be the inputName for the message and a `Message` object. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_014: [** The client shall start listening for messages from the service whenever there is a listener subscribed to the `inputMessage` event. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_015: [** The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `inputMessage` event. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_016: [** The client shall connect the transport if needed in order to receive inputMessages. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_18_017: [** The client shall emit an `error` if connecting the transport fails while subscribing to `inputMessage` events. **]**
|
||||
|
||||
#### error
|
||||
|
||||
|
@ -281,10 +193,6 @@ interface DeviceMethodEventHandler {
|
|||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_019: [** The `disconnect` event shall be emitted when the client is disconnected from the server. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_097: [** If the transport emits a `disconnect` event event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_102: [** If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_098: [** If the transport emits a `disconnect` event while the client is subscribed to direct methods the retry policy shall be used to reconnect and re-enable the feature using the transport `enableMethods` method. **]**
|
||||
|
||||
**SRS_NODE_INTERNAL_CLIENT_16_100: [** If the retry policy fails to reestablish the direct methods functionality a `disconnect` event shall be emitted with a `results.Disconnected` object. **]**
|
||||
|
|
|
@ -1,9 +1,50 @@
|
|||
# azure-iot-device.InternalClient Requirements
|
||||
# ModuleClient requirements
|
||||
|
||||
The `ModuleClient` class is used to connect modules to Azure IoT Hub and Azure Edge hub. Devices use the `Client` class.
|
||||
|
||||
## Overview
|
||||
azure-iot-device.ModuleClient is a type that captures the functionality needed to connect and communicate with an IoT Hub or an IoT Edge Hub service instance using a module identity.
|
||||
|
||||
## Public Interface
|
||||
```typescript
|
||||
class ModuleClient extends InternalClient {
|
||||
sendOutputEvent(outputName: string, message: Message, callback: (err?: Error, result?: results.MessageEnqueued) => void): void;
|
||||
sendOutputEventBatch(outputName: string, messages: Message[], callback: (err?: Error, result?: results.MessageEnqueued) => void): void;
|
||||
on(type = 'inputMessage', msgHandler: (msg: Message) => void): void;
|
||||
|
||||
static fromConnectionString(connStr: string, transportCtor: any): any;
|
||||
static fromSharedAccessSignature(sharedAccessSignature: string, transportCtor: any): any;
|
||||
static fromAuthenticationProvider(authenticationProvider: AuthenticationProvider, transportCtor: any):
|
||||
}
|
||||
```
|
||||
|
||||
## Public API
|
||||
|
||||
### fromConnectionString
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_05_003: [** The `fromConnectionString` method shall throw ReferenceError if the connStr argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_05_006: [** The `fromConnectionString` method shall return a new instance of the `Client` object, as by a call to `new Client(new Transport(...))`. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_087: [** The `fromConnectionString` method shall create a new `SharedAccessKeyAuthorizationProvider` object with the connection string passed as argument if it contains a SharedAccessKey parameter and pass this object to the transport constructor. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_001: [** The `fromConnectionString` method shall throw a `NotImplementedError` if the connection string does not contain a `SharedAccessKey` field because x509 authentication is not supported yet for modules. **]**
|
||||
|
||||
### fromSharedAccessSignature
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_029: [** The `fromSharedAccessSignature` method shall throw a `ReferenceError` if the sharedAccessSignature argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_030: [** The `fromSharedAccessSignature` method shall return a new instance of the `Client` object **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_088: [** The `fromSharedAccessSignature` method shall create a new `SharedAccessSignatureAuthorizationProvider` object with the shared access signature passed as argument, and pass this object to the transport constructor. **]**
|
||||
|
||||
### fromAuthenticationProvider
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_089: [** The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `authenticationProvider` argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_092: [** The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `transportCtor` argument is falsy. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_090: [** The `fromAuthenticationProvider` method shall pass the `authenticationProvider` object passed as argument to the transport constructor. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_091: [** The `fromAuthenticationProvider` method shall return a `Client` object configured with a new instance of a transport created using the `transportCtor` argument. **]**
|
||||
|
||||
#### fromEnvironment
|
||||
|
||||
|
@ -24,3 +65,38 @@ azure-iot-device.ModuleClient is a type that captures the functionality needed t
|
|||
**SRS_NODE_MODULE_CLIENT_13_031: [** The `fromEnvironment` method shall invoke the callback with a new instance of the `ModuleClient` object. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_13_032: [** The `fromEnvironment` method shall create a new `IotEdgeAuthenticationProvider` object and pass this to the transport constructor. **]**
|
||||
|
||||
### sendOutputEvent(outputName: string, message: Message, callback: (err?: Error, result?: results.MessageEnqueued) => void): void;
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_010: [** The `sendOutputEvent` method shall send the event indicated by the `message` argument via the transport associated with the Client instance. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_018: [** When the `sendOutputEvent` method completes, the `callback` function shall be invoked with the same arguments as the underlying transport method's callback. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_019: [** The `sendOutputEvent` method shall not throw if the `callback` is not passed. **]**
|
||||
|
||||
### sendOutputEventBatch(outputName: string, messages: Message[], callback: (err?: Error, result?: results.MessageEnqueued) => void): void
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_011: [** The `sendOutputEventBatch` method shall send the list of events (indicated by the `messages` argument) via the transport associated with the Client instance. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_021: [** When the `sendOutputEventBatch` method completes the `callback` function shall be invoked with the same arguments as the underlying transport method's callback. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_022: [** The `sendOutputEventBatch` method shall not throw if the `callback` is not passed. **]**
|
||||
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_096: [** The `setRetryPolicy` method shall call the `setRetryPolicy` method on the twin if it is set and pass it the `policy` object. **]**
|
||||
|
||||
### on('inputMessage', msgHandler)
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_012: [** The `inputMessage` event shall be emitted when an inputMessage is received from the IoT Hub service. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_013: [** The `inputMessage` event parameters shall be the inputName for the message and a `Message` object. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_014: [** The client shall start listening for messages from the service whenever there is a listener subscribed to the `inputMessage` event. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_015: [** The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `inputMessage` event. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_016: [** The client shall connect the transport if needed in order to receive inputMessages. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_18_017: [** The client shall emit an `error` if connecting the transport fails while subscribing to `inputMessage` events. **]**
|
||||
|
||||
**SRS_NODE_MODULE_CLIENT_16_097: [** The client shall emit an `error` if connecting the transport fails while unsubscribing to `inputMessage` events. **]**
|
|
@ -37,7 +37,7 @@
|
|||
"alltest": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec \"test/**/_*_test*.js\"",
|
||||
"ci": "npm -s run lint && npm -s run build && npm -s run alltest-min && npm -s run check-cover",
|
||||
"test": "npm -s run lint && npm -s run build && npm -s run unittest",
|
||||
"check-cover": "istanbul check-coverage --statements 95 --branches 86 --lines 97 --functions 91"
|
||||
"check-cover": "istanbul check-coverage --statements 95 --branches 86 --lines 97 --functions 92"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">= 0.10"
|
||||
|
|
|
@ -4,15 +4,19 @@
|
|||
'use strict';
|
||||
|
||||
import { Stream } from 'stream';
|
||||
import { EventEmitter } from 'events';
|
||||
import * as dbg from 'debug';
|
||||
const debug = dbg('azure-iot-device:InternalClient');
|
||||
|
||||
import { errors, results, Message, AuthenticationProvider, RetryPolicy } from 'azure-iot-common';
|
||||
import { AuthenticationProvider, RetryOperation, ConnectionString, results } from 'azure-iot-common';
|
||||
import { InternalClient, DeviceTransport } from './internal_client';
|
||||
import { BlobUploadClient } from './blob_upload';
|
||||
import { DeviceMethodRequest, DeviceMethodResponse } from './device_method';
|
||||
import { DeviceClientOptions } from './interfaces';
|
||||
import { Twin } from './twin';
|
||||
import { SharedAccessSignatureAuthenticationProvider } from './sas_authentication_provider';
|
||||
import { X509AuthenticationProvider } from './x509_authentication_provider';
|
||||
import { SharedAccessKeyAuthenticationProvider } from './sak_authentication_provider';
|
||||
|
||||
function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Error, result?: any): void {
|
||||
if (callback) callback(error, result);
|
||||
}
|
||||
/**
|
||||
* IoT Hub device client used to connect a device with an Azure IoT hub.
|
||||
*
|
||||
|
@ -21,9 +25,10 @@ import { Twin } from './twin';
|
|||
* or {@link azure-iot-device.Client.fromSharedAccessSignature|fromSharedAccessSignature}
|
||||
* to create an IoT Hub device client.
|
||||
*/
|
||||
export class Client extends EventEmitter {
|
||||
private _internalClient: InternalClient;
|
||||
|
||||
export class Client extends InternalClient {
|
||||
private _c2dEnabled: boolean;
|
||||
private _deviceDisconnectHandler: (err?: Error, result?: any) => void;
|
||||
private blobUploadClient: BlobUploadClient; // Casing is wrong and should be corrected.
|
||||
/**
|
||||
* @constructor
|
||||
* @param {Object} transport An object that implements the interface
|
||||
|
@ -33,157 +38,61 @@ export class Client extends EventEmitter {
|
|||
* @param {Object} blobUploadClient An object that is capable of uploading a stream to a blob.
|
||||
*/
|
||||
constructor(transport: DeviceTransport, connStr?: string, blobUploadClient?: BlobUploadClient) {
|
||||
super();
|
||||
this._internalClient = new InternalClient(transport, connStr, blobUploadClient);
|
||||
super(transport, connStr);
|
||||
this.blobUploadClient = blobUploadClient;
|
||||
this._c2dEnabled = false;
|
||||
|
||||
this.on('newListener', (event, listener) => {
|
||||
if (event === 'inputMessage') {
|
||||
throw new errors.ArgumentError('The Client object does not support \'inputMessage\' events. You need to use a ModuleClient object for that.');
|
||||
this.on('removeListener', (eventName) => {
|
||||
if (eventName === 'message' && this.listeners('message').length === 0) {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_005: [The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `message` event.]*/
|
||||
this._disableC2D((err) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
this._internalClient.on(event, listener);
|
||||
});
|
||||
|
||||
this.on('removeListener', (event, listener) => {
|
||||
this._internalClient.removeListener(event, listener);
|
||||
this.on('newListener', (eventName) => {
|
||||
if (eventName === 'message') {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_004: [The client shall start listening for messages from the service whenever there is a listener subscribed to the `message` event.]*/
|
||||
this._enableC2D((err) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_002: [The `message` event shall be emitted when a cloud-to-device message is received from the IoT Hub service.]*/
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_003: [The `message` event parameter shall be a `message` object.]*/
|
||||
this._transport.on('message', (msg) => {
|
||||
this.emit('message', msg);
|
||||
});
|
||||
|
||||
this._deviceDisconnectHandler = (err) => {
|
||||
debug('transport disconnect event: ' + (err ? err.toString() : 'no error'));
|
||||
if (err && this._retryPolicy.shouldRetry(err)) {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method.]*/
|
||||
if (this._c2dEnabled) {
|
||||
this._c2dEnabled = false;
|
||||
debug('re-enabling C2D link');
|
||||
this._enableC2D((err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
this.emit('disconnect', new results.Disconnected(err));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
this._transport.on('disconnect', this._deviceDisconnectHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Registers the `callback` to be invoked when a
|
||||
* cloud-to-device method call is received by the client
|
||||
* for the given `methodName`.
|
||||
*
|
||||
* @param {String} methodName The name of the method for which the callback
|
||||
* is to be registered.
|
||||
* @param {Function} callback The callback to be invoked when the C2D method
|
||||
* call is received.
|
||||
*
|
||||
* @throws {ReferenceError} If the `methodName` or `callback` parameter
|
||||
* is falsy.
|
||||
* @throws {TypeError} If the `methodName` parameter is not a string
|
||||
* or if the `callback` is not a function.
|
||||
*/
|
||||
onDeviceMethod(methodName: string, callback: (request: DeviceMethodRequest, response: DeviceMethodResponse) => void): void {
|
||||
this._internalClient.onDeviceMethod(methodName, callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Updates the Shared Access Signature token used by the transport to authenticate with the IoT Hub service.
|
||||
*
|
||||
* @param {String} sharedAccessSignature The new SAS token to use.
|
||||
* @param {Function} done The callback to be invoked when `updateSharedAccessSignature`
|
||||
* completes execution.
|
||||
*
|
||||
* @throws {ReferenceError} If the sharedAccessSignature parameter is falsy.
|
||||
* @throws {ReferenceError} If the client uses x509 authentication.
|
||||
*/
|
||||
updateSharedAccessSignature(sharedAccessSignature: string, updateSasCallback?: (err?: Error, result?: results.SharedAccessSignatureUpdated) => void): void {
|
||||
this._internalClient.updateSharedAccessSignature(sharedAccessSignature, updateSasCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Call the transport layer CONNECT function if the
|
||||
* transport layer implements it
|
||||
*
|
||||
* @param {Function} openCallback The callback to be invoked when `open`
|
||||
* completes execution.
|
||||
*/
|
||||
open(openCallback: (err?: Error, result?: results.Connected) => void): void {
|
||||
this._internalClient.open(openCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The [sendEvent]{@link sendEvent} method sends an event message
|
||||
* to the IoT Hub as the device indicated by the connection string passed
|
||||
* via the constructor.
|
||||
*
|
||||
* @param {azure-iot-common.Message} message The [message]{@link azure-iot-common.Message} to be sent.
|
||||
* @param {Function} sendEventCallback The callback to be invoked when `sendEvent` completes execution.
|
||||
*/
|
||||
sendEvent(message: Message, sendEventCallback?: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
this._internalClient.sendEvent(message, sendEventCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The [sendEventBatch]{@link sendEventBatch} method sends a list
|
||||
* of event messages to the IoT Hub as the device indicated by the connection
|
||||
* string passed via the constructor.
|
||||
*
|
||||
* @param {array<Message>} messages Array of [Message]{@link azure-iot-common.Message}
|
||||
* objects to be sent as a batch.
|
||||
* @param {Function} sendEventBatchCallback The callback to be invoked when
|
||||
* `sendEventBatch` completes execution.
|
||||
*/
|
||||
sendEventBatch(messages: Message[], sendEventBatchCallback?: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
this._internalClient.sendEventBatch(messages, sendEventBatchCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `close` method directs the transport to close the current connection to the IoT Hub instance
|
||||
*
|
||||
* @param {Function} closeCallback The callback to be invoked when the connection has been closed.
|
||||
*/
|
||||
close(closeCallback?: (err?: Error, result?: results.Disconnected) => void): void {
|
||||
this._internalClient.close(closeCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use Client.setOptions instead.
|
||||
* @description The `setTransportOptions` method configures transport-specific options for the client and its underlying transport object.
|
||||
*
|
||||
* @param {Object} options The options that shall be set (see transports documentation).
|
||||
* @param {Function} done The callback that shall be invoked with either an error or a result object.
|
||||
*/
|
||||
setTransportOptions(options: any, done?: (err?: Error, result?: results.TransportConfigured) => void): void {
|
||||
this._internalClient.setTransportOptions(options, done);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `setOptions` method let the user configure the client.
|
||||
*
|
||||
* @param {Object} options The options structure
|
||||
* @param {Function} done The callback that shall be called when setOptions is finished.
|
||||
*
|
||||
* @throws {ReferenceError} If the options structure is falsy
|
||||
*/
|
||||
setOptions(options: DeviceClientOptions, done?: (err?: Error, result?: results.TransportConfigured) => void): void {
|
||||
this._internalClient.setOptions(options, done);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `complete` method directs the transport to settle the message passed as argument as 'completed'.
|
||||
*
|
||||
* @param {Message} message The message to settle.
|
||||
* @param {Function} completeCallback The callback to call when the message is completed.
|
||||
*
|
||||
* @throws {ReferenceError} If the message is falsy.
|
||||
*/
|
||||
complete(message: Message, completeCallback: (err?: Error, result?: results.MessageCompleted) => void): void {
|
||||
this._internalClient.complete(message, completeCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `reject` method directs the transport to settle the message passed as argument as 'rejected'.
|
||||
*
|
||||
* @param {Message} message The message to settle.
|
||||
* @param {Function} rejectCallback The callback to call when the message is rejected.
|
||||
*
|
||||
* @throws {ReferenceException} If the message is falsy.
|
||||
*/
|
||||
reject(message: Message, rejectCallback: (err?: Error, result?: results.MessageRejected) => void): void {
|
||||
this._internalClient.reject(message, rejectCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `abandon` method directs the transport to settle the message passed as argument as 'abandoned'.
|
||||
*
|
||||
* @param {Message} message The message to settle.
|
||||
* @param {Function} abandonCallback The callback to call when the message is abandoned.
|
||||
*
|
||||
* @throws {ReferenceException} If the message is falsy.
|
||||
*/
|
||||
abandon(message: Message, abandonCallback: (err?: Error, result?: results.MessageAbandoned) => void): void {
|
||||
this._internalClient.abandon(message, abandonCallback);
|
||||
this._transport.removeListener('disconnect', this._deviceDisconnectHandler);
|
||||
super.close(closeCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -197,28 +106,54 @@ export class Client extends EventEmitter {
|
|||
* @throws {ReferenceException} If blobName or stream or streamLength is falsy.
|
||||
*/
|
||||
uploadToBlob(blobName: string, stream: Stream, streamLength: number, done: (err?: Error) => void): void {
|
||||
this._internalClient.uploadToBlob(blobName, stream, streamLength, done);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `getTwin` method creates a Twin object and establishes a connection with the Twin service.
|
||||
*
|
||||
* @param {Function} done The callback to call when the connection is established.
|
||||
*
|
||||
*/
|
||||
getTwin(done: (err?: Error, twin?: Twin) => void): void {
|
||||
this._internalClient.getTwin(done);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the retry policy used by the client on all operations. The default is {@link azure-iot-common.ExponentialBackoffWithJitter|ExponentialBackoffWithJitter}.
|
||||
* @param policy {RetryPolicy} The retry policy that should be used for all future operations.
|
||||
*/
|
||||
setRetryPolicy(policy: RetryPolicy): void {
|
||||
this._internalClient.setRetryPolicy(policy);
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_037: [The `uploadToBlob` method shall throw a `ReferenceError` if `blobName` is falsy.]*/
|
||||
if (!blobName) throw new ReferenceError('blobName cannot be \'' + blobName + '\'');
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_038: [The `uploadToBlob` method shall throw a `ReferenceError` if `stream` is falsy.]*/
|
||||
if (!stream) throw new ReferenceError('stream cannot be \'' + stream + '\'');
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_039: [The `uploadToBlob` method shall throw a `ReferenceError` if `streamLength` is falsy.]*/
|
||||
if (!streamLength) throw new ReferenceError('streamLength cannot be \'' + streamLength + '\'');
|
||||
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_040: [The `uploadToBlob` method shall call the `done` callback with an `Error` object if the upload fails.]*/
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_041: [The `uploadToBlob` method shall call the `done` callback no parameters if the upload succeeds.]*/
|
||||
this.blobUploadClient.uploadToBlob(blobName, stream, streamLength, opCallback);
|
||||
}, (err, result) => {
|
||||
safeCallback(done, err, result);
|
||||
});
|
||||
}
|
||||
|
||||
private _enableC2D(callback: (err?: Error) => void): void {
|
||||
if (!this._c2dEnabled) {
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
this._transport.enableC2D(opCallback);
|
||||
}, (err) => {
|
||||
if (!err) {
|
||||
this._c2dEnabled = true;
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
private _disableC2D(callback: (err?: Error) => void): void {
|
||||
if (this._c2dEnabled) {
|
||||
this._transport.disableC2D((err) => {
|
||||
if (!err) {
|
||||
this._c2dEnabled = false;
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @method module:azure-iot-device.Client.fromConnectionString
|
||||
* @description Creates an IoT Hub device client from the given
|
||||
* connection string using the given transport type.
|
||||
*
|
||||
|
@ -228,12 +163,33 @@ export class Client extends EventEmitter {
|
|||
*
|
||||
* @throws {ReferenceError} If the connStr parameter is falsy.
|
||||
*
|
||||
* @returns {module:azure-iothub.Client}
|
||||
*/
|
||||
static fromConnectionString(connStr: string, transportCtor: any): Client {
|
||||
return InternalClient.fromConnectionString(connStr, transportCtor, Client) as Client;
|
||||
static fromConnectionString(connStr: string, transportCtor: any): any {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_05_003: [The fromConnectionString method shall throw ReferenceError if the connStr argument is falsy.]*/
|
||||
if (!connStr) throw new ReferenceError('connStr is \'' + connStr + '\'');
|
||||
|
||||
const cn = ConnectionString.parse(connStr);
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_087: [The `fromConnectionString` method shall create a new `SharedAccessKeyAuthorizationProvider` object with the connection string passed as argument if it contains a SharedAccessKey parameter and pass this object to the transport constructor.]*/
|
||||
let authenticationProvider: AuthenticationProvider;
|
||||
|
||||
if (cn.SharedAccessKey) {
|
||||
authenticationProvider = SharedAccessKeyAuthenticationProvider.fromConnectionString(connStr);
|
||||
} else {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_093: [The `fromConnectionString` method shall create a new `X509AuthorizationProvider` object with the connection string passed as argument if it contains an X509 parameter and pass this object to the transport constructor.]*/
|
||||
authenticationProvider = new X509AuthenticationProvider({
|
||||
host: cn.HostName,
|
||||
deviceId: cn.DeviceId
|
||||
});
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_05_006: [The fromConnectionString method shall return a new instance of the Client object, as by a call to new Client(new transportCtor(...)).]*/
|
||||
return new Client(new transportCtor(authenticationProvider), null, new BlobUploadClient(authenticationProvider));
|
||||
}
|
||||
|
||||
/**
|
||||
* @method module:azure-iot-device.Client.fromSharedAccessSignature
|
||||
* @description Creates an IoT Hub device client from the given
|
||||
* shared access signature using the given transport type.
|
||||
*
|
||||
|
@ -243,17 +199,38 @@ export class Client extends EventEmitter {
|
|||
*
|
||||
* @throws {ReferenceError} If the connStr parameter is falsy.
|
||||
*
|
||||
* @returns {module:azure-iothub.Client}
|
||||
*/
|
||||
static fromSharedAccessSignature(sharedAccessSignature: string, transportCtor: any): Client {
|
||||
return InternalClient.fromSharedAccessSignature(sharedAccessSignature, transportCtor, Client) as Client;
|
||||
static fromSharedAccessSignature(sharedAccessSignature: string, transportCtor: any): any {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_029: [The fromSharedAccessSignature method shall throw a ReferenceError if the sharedAccessSignature argument is falsy.] */
|
||||
if (!sharedAccessSignature) throw new ReferenceError('sharedAccessSignature is \'' + sharedAccessSignature + '\'');
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_088: [The `fromSharedAccessSignature` method shall create a new `SharedAccessSignatureAuthorizationProvider` object with the shared access signature passed as argument, and pass this object to the transport constructor.]*/
|
||||
const authenticationProvider = SharedAccessSignatureAuthenticationProvider.fromSharedAccessSignature(sharedAccessSignature);
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_030: [The fromSharedAccessSignature method shall return a new instance of the Client object] */
|
||||
return new Client(new transportCtor(authenticationProvider), null, new BlobUploadClient(authenticationProvider));
|
||||
}
|
||||
|
||||
/**
|
||||
* @method module:azure-iot-device.Client.fromAuthenticationMethod
|
||||
* @description Creates an IoT Hub device client from the given authentication method and using the given transport type.
|
||||
* @param authenticationProvider Object used to obtain the authentication parameters for the IoT hub.
|
||||
* @param transportCtor Transport protocol used to connect to IoT hub.
|
||||
*/
|
||||
static fromAuthenticationProvider(authenticationProvider: AuthenticationProvider, transportCtor: any): Client {
|
||||
return InternalClient.fromAuthenticationProvider(authenticationProvider, transportCtor, Client) as Client;
|
||||
static fromAuthenticationProvider(authenticationProvider: AuthenticationProvider, transportCtor: any): any {
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_089: [The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `authenticationProvider` argument is falsy.]*/
|
||||
if (!authenticationProvider) {
|
||||
throw new ReferenceError('authenticationMethod cannot be \'' + authenticationProvider + '\'');
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_092: [The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `transportCtor` argument is falsy.]*/
|
||||
if (!transportCtor) {
|
||||
throw new ReferenceError('transportCtor cannot be \'' + transportCtor + '\'');
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_090: [The `fromAuthenticationProvider` method shall pass the `authenticationProvider` object passed as argument to the transport constructor.]*/
|
||||
/*Codes_SRS_NODE_DEVICE_CLIENT_16_091: [The `fromAuthenticationProvider` method shall return a `Client` object configured with a new instance of a transport created using the `transportCtor` argument.]*/
|
||||
return new Client(new transportCtor(authenticationProvider), null, new BlobUploadClient(authenticationProvider));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,16 +8,11 @@ import { EventEmitter } from 'events';
|
|||
import * as dbg from 'debug';
|
||||
const debug = dbg('azure-iot-device:InternalClient');
|
||||
|
||||
import { results, errors, Message, X509, AuthenticationProvider } from 'azure-iot-common';
|
||||
import { results, errors, Message, X509 } from 'azure-iot-common';
|
||||
import { SharedAccessSignature as CommonSharedAccessSignature } from 'azure-iot-common';
|
||||
import { ExponentialBackOffWithJitter, RetryPolicy, RetryOperation } from 'azure-iot-common';
|
||||
import * as ConnectionString from './connection_string.js';
|
||||
import { BlobUploadClient } from './blob_upload';
|
||||
import { DeviceMethodRequest, DeviceMethodResponse } from './device_method';
|
||||
import { Twin, TwinProperties } from './twin';
|
||||
import { SharedAccessKeyAuthenticationProvider } from './sak_authentication_provider';
|
||||
import { SharedAccessSignatureAuthenticationProvider } from './sas_authentication_provider';
|
||||
import { X509AuthenticationProvider } from './x509_authentication_provider';
|
||||
import { DeviceClientOptions } from './interfaces';
|
||||
|
||||
/**
|
||||
|
@ -33,7 +28,7 @@ function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Er
|
|||
/**
|
||||
* @private
|
||||
*/
|
||||
export class InternalClient extends EventEmitter {
|
||||
export abstract class InternalClient extends EventEmitter {
|
||||
// SAS token created by the client have a lifetime of 60 minutes, renew every 45 minutes
|
||||
/**
|
||||
* @private
|
||||
|
@ -55,40 +50,25 @@ export class InternalClient extends EventEmitter {
|
|||
* Maximum timeout (in milliseconds) used to consider an operation failed.
|
||||
* The operation will be retried according to the retry policy set with {@link azure-iot-device.Client.setRetryPolicy} method (or {@link azure-iot-common.ExponentialBackoffWithJitter} by default) until this value is reached.)
|
||||
*/
|
||||
private _maxOperationTimeout: number;
|
||||
protected _maxOperationTimeout: number;
|
||||
protected _retryPolicy: RetryPolicy;
|
||||
|
||||
private _methodCallbackMap: any;
|
||||
private _disconnectHandler: (err?: Error, result?: any) => void;
|
||||
private blobUploadClient: BlobUploadClient; // TODO: wrong casing/naming convention
|
||||
private _c2dEnabled: boolean;
|
||||
private _methodsEnabled: boolean;
|
||||
private _inputMessagesEnabled: boolean;
|
||||
|
||||
private _retryPolicy: RetryPolicy;
|
||||
|
||||
constructor(transport: DeviceTransport, connStr?: string, blobUploadClient?: BlobUploadClient) {
|
||||
constructor(transport: DeviceTransport, connStr?: string) {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_05_001: [The Client constructor shall throw ReferenceError if the transport argument is falsy.]*/
|
||||
if (!transport) throw new ReferenceError('transport is \'' + transport + '\'');
|
||||
|
||||
super();
|
||||
this._c2dEnabled = false;
|
||||
this._methodsEnabled = false;
|
||||
this._inputMessagesEnabled = false;
|
||||
this.blobUploadClient = blobUploadClient;
|
||||
|
||||
if (connStr) {
|
||||
throw new errors.InvalidOperationError('the connectionString parameter of the constructor is not used - users of the SDK should be using the `fromConnectionString` factory method.');
|
||||
}
|
||||
|
||||
this._transport = transport;
|
||||
this._transport.on('message', (msg) => {
|
||||
this.emit('message', msg);
|
||||
});
|
||||
|
||||
/* Codes_SRS_NODE_INTERNAL_CLIENT_18_012: [ The `inputMessage` event shall be emitted when an inputMessage is received from the IoT Hub service. ]*/
|
||||
/* Codes_SRS_NODE_INTERNAL_CLIENT_18_013: [ The `inputMessage` event parameters shall be the inputName for the message and a `Message` object. ]*/
|
||||
this._transport.on('inputMessage', (inputName, msg) => {
|
||||
this.emit('inputMessage', inputName, msg);
|
||||
});
|
||||
|
||||
this._transport.on('error', (err) => {
|
||||
// errors right now bubble up through the disconnect handler.
|
||||
|
@ -98,65 +78,9 @@ export class InternalClient extends EventEmitter {
|
|||
|
||||
this._methodCallbackMap = {};
|
||||
|
||||
this.on('removeListener', (eventName) => {
|
||||
if (eventName === 'message' && this.listeners('message').length === 0) {
|
||||
this._disableC2D((err) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
} else if (eventName === 'inputMessage' && this.listeners('inputMessage').length === 0) {
|
||||
/* Codes_SRS_NODE_INTERNAL_CLIENT_18_015: [ The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `inputMessage` event. ]*/
|
||||
this._disableInputMessages((err) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
this.on('newListener', (eventName) => {
|
||||
if (eventName === 'message') {
|
||||
this._enableC2D((err) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
} else if (eventName === 'inputMessage') {
|
||||
/* Codes_SRS_NODE_INTERNAL_CLIENT_18_014: [ The client shall start listening for messages from the service whenever there is a listener subscribed to the `inputMessage` event. ]*/
|
||||
this._enableInputMessages((err) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
this._disconnectHandler = (err) => {
|
||||
debug('transport disconnect event: ' + (err ? err.toString() : 'no error'));
|
||||
if (err && this._retryPolicy.shouldRetry(err)) {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_097: [If the transport emits a `disconnect` event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method.]*/
|
||||
if (this._c2dEnabled) {
|
||||
this._c2dEnabled = false;
|
||||
debug('re-enabling C2D link');
|
||||
this._enableC2D((err) => {
|
||||
if (err) {
|
||||
this.emit('disconnect', new results.Disconnected(err));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (this._inputMessagesEnabled) {
|
||||
this._inputMessagesEnabled = false;
|
||||
debug('re-enabling input message link');
|
||||
this._enableInputMessages((err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
this.emit('disconnect', new results.Disconnected(err));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_098: [If the transport emits a `disconnect` event while the client is subscribed to direct methods the retry policy shall be used to reconnect and re-enable the feature using the transport `enableMethods` method.]*/
|
||||
if (this._methodsEnabled) {
|
||||
this._methodsEnabled = false;
|
||||
|
@ -336,24 +260,6 @@ export class InternalClient extends EventEmitter {
|
|||
});
|
||||
}
|
||||
|
||||
uploadToBlob(blobName: string, stream: Stream, streamLength: number, done: (err?: Error) => void): void {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_037: [The `uploadToBlob` method shall throw a `ReferenceError` if `blobName` is falsy.]*/
|
||||
if (!blobName) throw new ReferenceError('blobName cannot be \'' + blobName + '\'');
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_038: [The `uploadToBlob` method shall throw a `ReferenceError` if `stream` is falsy.]*/
|
||||
if (!stream) throw new ReferenceError('stream cannot be \'' + stream + '\'');
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_039: [The `uploadToBlob` method shall throw a `ReferenceError` if `streamLength` is falsy.]*/
|
||||
if (!streamLength) throw new ReferenceError('streamLength cannot be \'' + streamLength + '\'');
|
||||
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_040: [The `uploadToBlob` method shall call the `done` callback with an `Error` object if the upload fails.]*/
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_041: [The `uploadToBlob` method shall call the `done` callback no parameters if the upload succeeds.]*/
|
||||
this.blobUploadClient.uploadToBlob(blobName, stream, streamLength, opCallback);
|
||||
}, (err, result) => {
|
||||
safeCallback(done, err, result);
|
||||
});
|
||||
}
|
||||
|
||||
getTwin(done: (err?: Error, twin?: Twin) => void): void {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_094: [If this is the first call to `getTwin` the method shall instantiate a new `Twin` object and pass it the transport currently in use.]*/
|
||||
if (!this._twin) {
|
||||
|
@ -389,30 +295,6 @@ export class InternalClient extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
sendOutputEvent(outputName: string, message: Message, callback: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/* Codes_SRS_NODE_INTERNAL_CLIENT_18_010: [ The `sendOutputEvent` method shall send the event indicated by the `message` argument via the transport associated with the Client instance. ]*/
|
||||
this._transport.sendOutputEvent(outputName, message, opCallback);
|
||||
}, (err, result) => {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_18_018: [ When the `sendOutputEvent` method completes, the `callback` function shall be invoked with the same arguments as the underlying transport method's callback. ]*/
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_18_019: [ The `sendOutputEvent` method shall not throw if the `callback` is not passed. ]*/
|
||||
safeCallback(callback, err, result);
|
||||
});
|
||||
}
|
||||
|
||||
sendOutputEventBatch(outputName: string, messages: Message[], callback: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/* Codes_SRS_NODE_INTERNAL_CLIENT_18_011: [ The `sendOutputEventBatch` method shall send the list of events (indicated by the `messages` argument) via the transport associated with the Client instance. ]*/
|
||||
this._transport.sendOutputEventBatch(outputName, messages, opCallback);
|
||||
}, (err, result) => {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_18_021: [ When the `sendOutputEventBatch` method completes the `callback` function shall be invoked with the same arguments as the underlying transport method's callback. ]*/
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_18_022: [ The `sendOutputEventBatch` method shall not throw if the `callback` is not passed. ]*/
|
||||
safeCallback(callback, err, result);
|
||||
});
|
||||
}
|
||||
|
||||
private _validateDeviceMethodInputs(methodName: string, callback: (request: DeviceMethodRequest, response: DeviceMethodResponse) => void): void {
|
||||
// Codes_SRS_NODE_INTERNAL_CLIENT_13_020: [ onDeviceMethod shall throw a ReferenceError if methodName is falsy. ]
|
||||
if (!methodName) {
|
||||
|
@ -457,65 +339,6 @@ export class InternalClient extends EventEmitter {
|
|||
});
|
||||
}
|
||||
|
||||
private _enableC2D(callback: (err?: Error) => void): void {
|
||||
if (!this._c2dEnabled) {
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
this._transport.enableC2D(opCallback);
|
||||
}, (err) => {
|
||||
if (!err) {
|
||||
this._c2dEnabled = true;
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
private _disableInputMessages(callback: (err?: Error) => void): void {
|
||||
if (this._inputMessagesEnabled) {
|
||||
this._transport.disableInputMessages((err) => {
|
||||
if (!err) {
|
||||
this._inputMessagesEnabled = false;
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
private _enableInputMessages(callback: (err?: Error) => void): void {
|
||||
if (!this._inputMessagesEnabled) {
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/* Codes_SRS_NODE_INTERNAL_CLIENT_18_016: [ The client shall connect the transport if needed in order to receive inputMessages. ]*/
|
||||
this._transport.enableInputMessages(opCallback);
|
||||
}, (err) => {
|
||||
if (!err) {
|
||||
this._inputMessagesEnabled = true;
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
private _disableC2D(callback: (err?: Error) => void): void {
|
||||
if (this._c2dEnabled) {
|
||||
this._transport.disableC2D((err) => {
|
||||
if (!err) {
|
||||
this._c2dEnabled = false;
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
private _enableMethods(callback: (err?: Error) => void): void {
|
||||
if (!this._methodsEnabled) {
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
|
@ -560,88 +383,6 @@ export class InternalClient extends EventEmitter {
|
|||
onDisconnected(disconnectError, disconnectResult);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @method module:azure-iot-device.Client.fromConnectionString
|
||||
* @description Creates an IoT Hub device client from the given
|
||||
* connection string using the given transport type.
|
||||
*
|
||||
* @param {String} connStr A connection string which encapsulates "device
|
||||
* connect" permissions on an IoT hub.
|
||||
* @param {Function} Transport A transport constructor.
|
||||
*
|
||||
* @throws {ReferenceError} If the connStr parameter is falsy.
|
||||
*
|
||||
* @returns {module:azure-iothub.Client}
|
||||
*/
|
||||
static fromConnectionString(connStr: string, transportCtor: any, clientCtor: any): any {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_05_003: [The fromConnectionString method shall throw ReferenceError if the connStr argument is falsy.]*/
|
||||
if (!connStr) throw new ReferenceError('connStr is \'' + connStr + '\'');
|
||||
|
||||
const cn = ConnectionString.parse(connStr);
|
||||
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_087: [The `fromConnectionString` method shall create a new `SharedAccessKeyAuthorizationProvider` object with the connection string passed as argument if it contains a SharedAccessKey parameter and pass this object to the transport constructor.]*/
|
||||
let authenticationProvider: AuthenticationProvider;
|
||||
|
||||
if (cn.SharedAccessKey) {
|
||||
authenticationProvider = SharedAccessKeyAuthenticationProvider.fromConnectionString(connStr);
|
||||
} else {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_093: [The `fromConnectionString` method shall create a new `X509AuthorizationProvider` object with the connection string passed as argument if it contains an X509 parameter and pass this object to the transport constructor.]*/
|
||||
authenticationProvider = new X509AuthenticationProvider({
|
||||
host: cn.HostName,
|
||||
deviceId: cn.DeviceId
|
||||
});
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_05_006: [The fromConnectionString method shall return a new instance of the Client object, as by a call to new Client(new transportCtor(...)).]*/
|
||||
return new clientCtor(new transportCtor(authenticationProvider), null, new BlobUploadClient(authenticationProvider));
|
||||
}
|
||||
|
||||
/**
|
||||
* @method module:azure-iot-device.Client.fromSharedAccessSignature
|
||||
* @description Creates an IoT Hub device client from the given
|
||||
* shared access signature using the given transport type.
|
||||
*
|
||||
* @param {String} sharedAccessSignature A shared access signature which encapsulates "device
|
||||
* connect" permissions on an IoT hub.
|
||||
* @param {Function} Transport A transport constructor.
|
||||
*
|
||||
* @throws {ReferenceError} If the connStr parameter is falsy.
|
||||
*
|
||||
* @returns {module:azure-iothub.Client}
|
||||
*/
|
||||
static fromSharedAccessSignature(sharedAccessSignature: string, transportCtor: any, clientCtor: any): any {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_029: [The fromSharedAccessSignature method shall throw a ReferenceError if the sharedAccessSignature argument is falsy.] */
|
||||
if (!sharedAccessSignature) throw new ReferenceError('sharedAccessSignature is \'' + sharedAccessSignature + '\'');
|
||||
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_088: [The `fromSharedAccessSignature` method shall create a new `SharedAccessSignatureAuthorizationProvider` object with the shared access signature passed as argument, and pass this object to the transport constructor.]*/
|
||||
const authenticationProvider = SharedAccessSignatureAuthenticationProvider.fromSharedAccessSignature(sharedAccessSignature);
|
||||
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_030: [The fromSharedAccessSignature method shall return a new instance of the Client object] */
|
||||
return new clientCtor(new transportCtor(authenticationProvider), null, new BlobUploadClient(authenticationProvider));
|
||||
}
|
||||
|
||||
/**
|
||||
* @method module:azure-iot-device.Client.fromAuthenticationMethod
|
||||
* @description Creates an IoT Hub device client from the given authentication method and using the given transport type.
|
||||
* @param authenticationProvider Object used to obtain the authentication parameters for the IoT hub.
|
||||
* @param transportCtor Transport protocol used to connect to IoT hub.
|
||||
*/
|
||||
static fromAuthenticationProvider(authenticationProvider: AuthenticationProvider, transportCtor: any, clientCtor: any): any {
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_089: [The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `authenticationProvider` argument is falsy.]*/
|
||||
if (!authenticationProvider) {
|
||||
throw new ReferenceError('authenticationMethod cannot be \'' + authenticationProvider + '\'');
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_092: [The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `transportCtor` argument is falsy.]*/
|
||||
if (!transportCtor) {
|
||||
throw new ReferenceError('transportCtor cannot be \'' + transportCtor + '\'');
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_090: [The `fromAuthenticationProvider` method shall pass the `authenticationProvider` object passed as argument to the transport constructor.]*/
|
||||
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_091: [The `fromAuthenticationProvider` method shall return a `Client` object configured with a new instance of a transport created using the `transportCtor` argument.]*/
|
||||
return new clientCtor(new transportCtor(authenticationProvider), null, new BlobUploadClient(authenticationProvider));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -3,15 +3,20 @@
|
|||
|
||||
'use strict';
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import * as dbg from 'debug';
|
||||
const debug = dbg('azure-iot-device:ModuleClient');
|
||||
|
||||
import { results, Message, AuthenticationProvider, RetryPolicy, errors } from 'azure-iot-common';
|
||||
import { results, Message, RetryOperation, ConnectionString, AuthenticationProvider } from 'azure-iot-common';
|
||||
import { InternalClient, DeviceTransport } from './internal_client';
|
||||
import { DeviceMethodRequest, DeviceMethodResponse } from './device_method';
|
||||
import { DeviceClientOptions } from './interfaces';
|
||||
import { Twin } from './twin';
|
||||
import { errors } from 'azure-iot-common';
|
||||
import { SharedAccessKeyAuthenticationProvider } from './sak_authentication_provider';
|
||||
import { SharedAccessSignatureAuthenticationProvider } from './sas_authentication_provider';
|
||||
import { IotEdgeAuthenticationProvider } from './iotedge_authentication_provider';
|
||||
|
||||
function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Error, result?: any): void {
|
||||
if (callback) callback(error, result);
|
||||
}
|
||||
|
||||
/**
|
||||
* IoT Hub device client used to connect a device with an Azure IoT hub.
|
||||
*
|
||||
|
@ -20,8 +25,9 @@ import { IotEdgeAuthenticationProvider } from './iotedge_authentication_provider
|
|||
* or {@link azure-iot-device.Client.fromSharedAccessSignature|fromSharedAccessSignature}
|
||||
* to create an IoT Hub device client.
|
||||
*/
|
||||
export class ModuleClient extends EventEmitter {
|
||||
private _internalClient: InternalClient;
|
||||
export class ModuleClient extends InternalClient {
|
||||
private _inputMessagesEnabled: boolean;
|
||||
private _moduleDisconnectHandler: (err?: Error, result?: any) => void;
|
||||
|
||||
/**
|
||||
* @constructor
|
||||
|
@ -31,138 +37,56 @@ export class ModuleClient extends EventEmitter {
|
|||
* @param {string} connStr A connection string (optional: when not provided, updateSharedAccessSignature must be called to set the SharedAccessSignature token directly).
|
||||
*/
|
||||
constructor(transport: DeviceTransport, connStr?: string) {
|
||||
super();
|
||||
this._internalClient = new InternalClient(transport, connStr);
|
||||
super(transport, connStr);
|
||||
this._inputMessagesEnabled = false;
|
||||
|
||||
this.on('newListener', (event, listener) => {
|
||||
if (event === 'message') {
|
||||
throw new errors.ArgumentError('The ModuleClient object does not support \'message\' events. You need to use a Client object for that');
|
||||
/* Codes_SRS_NODE_MODULE_CLIENT_18_012: [ The `inputMessage` event shall be emitted when an inputMessage is received from the IoT Hub service. ]*/
|
||||
/* Codes_SRS_NODE_MODULE_CLIENT_18_013: [ The `inputMessage` event parameters shall be the inputName for the message and a `Message` object. ]*/
|
||||
this._transport.on('inputMessage', (inputName, msg) => {
|
||||
this.emit('inputMessage', inputName, msg);
|
||||
});
|
||||
|
||||
this.on('removeListener', (eventName) => {
|
||||
if (eventName === 'inputMessage' && this.listeners('inputMessage').length === 0) {
|
||||
/* Codes_SRS_NODE_MODULE_CLIENT_18_015: [ The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `inputMessage` event. ]*/
|
||||
this._disableInputMessages((err) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
this._internalClient.on(event, listener);
|
||||
});
|
||||
|
||||
this.on('removeListener', (event, listener) => {
|
||||
this._internalClient.removeListener(event, listener);
|
||||
this.on('newListener', (eventName) => {
|
||||
if (eventName === 'inputMessage') {
|
||||
/* Codes_SRS_NODE_MODULE_CLIENT_18_014: [ The client shall start listening for messages from the service whenever there is a listener subscribed to the `inputMessage` event. ]*/
|
||||
this._enableInputMessages((err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_18_017: [The client shall emit an `error` if connecting the transport fails while subscribing to `inputMessage` events.]*/
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Registers the `callback` to be invoked when a
|
||||
* cloud-to-device method call is received by the client
|
||||
* for the given `methodName`.
|
||||
*
|
||||
* @param {String} methodName The name of the method for which the callback
|
||||
* is to be registered.
|
||||
* @param {Function} callback The callback to be invoked when the C2D method
|
||||
* call is received.
|
||||
*
|
||||
* @throws {ReferenceError} If the `methodName` or `callback` parameter
|
||||
* is falsy.
|
||||
* @throws {TypeError} If the `methodName` parameter is not a string
|
||||
* or if the `callback` is not a function.
|
||||
*/
|
||||
onDeviceMethod(methodName: string, callback: (request: DeviceMethodRequest, response: DeviceMethodResponse) => void): void {
|
||||
this._internalClient.onDeviceMethod(methodName, callback);
|
||||
}
|
||||
this._moduleDisconnectHandler = (err) => {
|
||||
debug('transport disconnect event: ' + (err ? err.toString() : 'no error'));
|
||||
if (err && this._retryPolicy.shouldRetry(err)) {
|
||||
if (this._inputMessagesEnabled) {
|
||||
this._inputMessagesEnabled = false;
|
||||
debug('re-enabling input message link');
|
||||
this._enableInputMessages((err) => {
|
||||
if (err) {
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
this.emit('disconnect', new results.Disconnected(err));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @description Updates the Shared Access Signature token used by the transport to authenticate with the IoT Hub service.
|
||||
*
|
||||
* @param {String} sharedAccessSignature The new SAS token to use.
|
||||
* @param {Function} done The callback to be invoked when `updateSharedAccessSignature`
|
||||
* completes execution.
|
||||
*
|
||||
* @throws {ReferenceError} If the sharedAccessSignature parameter is falsy.
|
||||
* @throws {ReferenceError} If the client uses x509 authentication.
|
||||
*/
|
||||
updateSharedAccessSignature(sharedAccessSignature: string, updateSasCallback?: (err?: Error, result?: results.SharedAccessSignatureUpdated) => void): void {
|
||||
this._internalClient.updateSharedAccessSignature(sharedAccessSignature, updateSasCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Call the transport layer CONNECT function if the
|
||||
* transport layer implements it
|
||||
*
|
||||
* @param {Function} openCallback The callback to be invoked when `open`
|
||||
* completes execution.
|
||||
*/
|
||||
open(openCallback: (err?: Error, result?: results.Connected) => void): void {
|
||||
this._internalClient.open(openCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `close` method directs the transport to close the current connection to the IoT Hub instance
|
||||
*
|
||||
* @param {Function} closeCallback The callback to be invoked when the connection has been closed.
|
||||
*/
|
||||
close(closeCallback?: (err?: Error, result?: results.Disconnected) => void): void {
|
||||
this._internalClient.close(closeCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `setOptions` method let the user configure the client.
|
||||
*
|
||||
* @param {Object} options The options structure
|
||||
* @param {Function} done The callback that shall be called when setOptions is finished.
|
||||
*
|
||||
* @throws {ReferenceError} If the options structure is falsy
|
||||
*/
|
||||
setOptions(options: DeviceClientOptions, done?: (err?: Error, result?: results.TransportConfigured) => void): void {
|
||||
this._internalClient.setOptions(options, done);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `complete` method directs the transport to settle the message passed as argument as 'completed'.
|
||||
*
|
||||
* @param {Message} message The message to settle.
|
||||
* @param {Function} completeCallback The callback to call when the message is completed.
|
||||
*
|
||||
* @throws {ReferenceError} If the message is falsy.
|
||||
*/
|
||||
complete(message: Message, completeCallback: (err?: Error, result?: results.MessageCompleted) => void): void {
|
||||
this._internalClient.complete(message, completeCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `reject` method directs the transport to settle the message passed as argument as 'rejected'.
|
||||
*
|
||||
* @param {Message} message The message to settle.
|
||||
* @param {Function} rejectCallback The callback to call when the message is rejected.
|
||||
*
|
||||
* @throws {ReferenceException} If the message is falsy.
|
||||
*/
|
||||
reject(message: Message, rejectCallback: (err?: Error, result?: results.MessageRejected) => void): void {
|
||||
this._internalClient.reject(message, rejectCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `abandon` method directs the transport to settle the message passed as argument as 'abandoned'.
|
||||
*
|
||||
* @param {Message} message The message to settle.
|
||||
* @param {Function} abandonCallback The callback to call when the message is abandoned.
|
||||
*
|
||||
* @throws {ReferenceException} If the message is falsy.
|
||||
*/
|
||||
abandon(message: Message, abandonCallback: (err?: Error, result?: results.MessageAbandoned) => void): void {
|
||||
this._internalClient.abandon(message, abandonCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description The `getTwin` method creates a Twin object and establishes a connection with the Twin service.
|
||||
*
|
||||
* @param {Function} done The callback to call when the connection is established.
|
||||
*
|
||||
*/
|
||||
getTwin(done: (err?: Error, twin?: Twin) => void): void {
|
||||
this._internalClient.getTwin(done);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the retry policy used by the client on all operations. The default is {@link azure-iot-common.ExponentialBackoffWithJitter|ExponentialBackoffWithJitter}.
|
||||
* @param policy {RetryPolicy} The retry policy that should be used for all future operations.
|
||||
*/
|
||||
setRetryPolicy(policy: RetryPolicy): void {
|
||||
this._internalClient.setRetryPolicy(policy);
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_045: [If the transport successfully establishes a connection the `open` method shall subscribe to the `disconnect` event of the transport.]*/
|
||||
this._transport.on('disconnect', this._moduleDisconnectHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -172,7 +96,15 @@ export class ModuleClient extends EventEmitter {
|
|||
* @param callback Function to call when the operation has been queued.
|
||||
*/
|
||||
sendOutputEvent(outputName: string, message: Message, callback: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
this._internalClient.sendOutputEvent(outputName, message, callback);
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/* Codes_SRS_NODE_MODULE_CLIENT_18_010: [ The `sendOutputEvent` method shall send the event indicated by the `message` argument via the transport associated with the Client instance. ]*/
|
||||
this._transport.sendOutputEvent(outputName, message, opCallback);
|
||||
}, (err, result) => {
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_18_018: [ When the `sendOutputEvent` method completes, the `callback` function shall be invoked with the same arguments as the underlying transport method's callback. ]*/
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_18_019: [ The `sendOutputEvent` method shall not throw if the `callback` is not passed. ]*/
|
||||
safeCallback(callback, err, result);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -182,27 +114,74 @@ export class ModuleClient extends EventEmitter {
|
|||
* @param callback Function to call when the operations have been queued.
|
||||
*/
|
||||
sendOutputEventBatch(outputName: string, messages: Message[], callback: (err?: Error, result?: results.MessageEnqueued) => void): void {
|
||||
this._internalClient.sendOutputEventBatch(outputName, messages, callback);
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/* Codes_SRS_NODE_MODULE_CLIENT_18_011: [ The `sendOutputEventBatch` method shall send the list of events (indicated by the `messages` argument) via the transport associated with the Client instance. ]*/
|
||||
this._transport.sendOutputEventBatch(outputName, messages, opCallback);
|
||||
}, (err, result) => {
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_18_021: [ When the `sendOutputEventBatch` method completes the `callback` function shall be invoked with the same arguments as the underlying transport method's callback. ]*/
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_18_022: [ The `sendOutputEventBatch` method shall not throw if the `callback` is not passed. ]*/
|
||||
safeCallback(callback, err, result);
|
||||
});
|
||||
}
|
||||
|
||||
close(closeCallback?: (err?: Error, result?: results.Disconnected) => void): void {
|
||||
this._transport.removeListener('disconnect', this._moduleDisconnectHandler);
|
||||
super.close(closeCallback);
|
||||
}
|
||||
|
||||
private _disableInputMessages(callback: (err?: Error) => void): void {
|
||||
if (this._inputMessagesEnabled) {
|
||||
this._transport.disableInputMessages((err) => {
|
||||
if (!err) {
|
||||
this._inputMessagesEnabled = false;
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
private _enableInputMessages(callback: (err?: Error) => void): void {
|
||||
if (!this._inputMessagesEnabled) {
|
||||
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
|
||||
retryOp.retry((opCallback) => {
|
||||
/* Codes_SRS_NODE_MODULE_CLIENT_18_016: [ The client shall connect the transport if needed in order to receive inputMessages. ]*/
|
||||
this._transport.enableInputMessages(opCallback);
|
||||
}, (err) => {
|
||||
if (!err) {
|
||||
this._inputMessagesEnabled = true;
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Creates an IoT Hub device client from the given
|
||||
* connection string using the given transport type.
|
||||
*
|
||||
* @param {String} connStr A connection string which encapsulates "device
|
||||
* connect" permissions on an IoT hub.
|
||||
* @param {Function} Transport A transport constructor.
|
||||
*
|
||||
* @throws {ReferenceError} If the connStr parameter is falsy.
|
||||
*
|
||||
*/
|
||||
static fromConnectionString(connStr: string, transportCtor: any): ModuleClient {
|
||||
return InternalClient.fromConnectionString(connStr, transportCtor, ModuleClient) as ModuleClient;
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_05_003: [The fromConnectionString method shall throw ReferenceError if the connStr argument is falsy.]*/
|
||||
if (!connStr) throw new ReferenceError('connStr is \'' + connStr + '\'');
|
||||
|
||||
const cn = ConnectionString.parse(connStr);
|
||||
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_087: [The `fromConnectionString` method shall create a new `SharedAccessKeyAuthorizationProvider` object with the connection string passed as argument if it contains a SharedAccessKey parameter and pass this object to the transport constructor.]*/
|
||||
let authenticationProvider: AuthenticationProvider;
|
||||
|
||||
if (cn.SharedAccessKey) {
|
||||
authenticationProvider = SharedAccessKeyAuthenticationProvider.fromConnectionString(connStr);
|
||||
} else {
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_001: [The `fromConnectionString` method shall throw a `NotImplementedError` if the connection string does not contain a `SharedAccessKey` field because x509 authentication is not supported yet for modules.]*/
|
||||
throw new errors.NotImplementedError('ModuleClient only supports SAS Token authentication');
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_05_006: [The fromConnectionString method shall return a new instance of the Client object, as by a call to new Client(new transportCtor(...)).]*/
|
||||
return new ModuleClient(new transportCtor(authenticationProvider), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Creates an IoT Hub device client from the given
|
||||
* shared access signature using the given transport type.
|
||||
* Creates an IoT Hub module client from the given shared access signature using the given transport type.
|
||||
*
|
||||
* @param {String} sharedAccessSignature A shared access signature which encapsulates "device
|
||||
* connect" permissions on an IoT hub.
|
||||
|
@ -210,65 +189,53 @@ export class ModuleClient extends EventEmitter {
|
|||
*
|
||||
* @throws {ReferenceError} If the connStr parameter is falsy.
|
||||
*
|
||||
* @returns {module:azure-iothub.Client}
|
||||
*/
|
||||
static fromSharedAccessSignature(sharedAccessSignature: string, transportCtor: any): ModuleClient {
|
||||
return InternalClient.fromSharedAccessSignature(sharedAccessSignature, transportCtor, ModuleClient) as ModuleClient;
|
||||
static fromSharedAccessSignature(sharedAccessSignature: string, transportCtor: any): any {
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_029: [The fromSharedAccessSignature method shall throw a ReferenceError if the sharedAccessSignature argument is falsy.] */
|
||||
if (!sharedAccessSignature) throw new ReferenceError('sharedAccessSignature is \'' + sharedAccessSignature + '\'');
|
||||
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_088: [The `fromSharedAccessSignature` method shall create a new `SharedAccessSignatureAuthorizationProvider` object with the shared access signature passed as argument, and pass this object to the transport constructor.]*/
|
||||
const authenticationProvider = SharedAccessSignatureAuthenticationProvider.fromSharedAccessSignature(sharedAccessSignature);
|
||||
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_030: [The fromSharedAccessSignature method shall return a new instance of the Client object] */
|
||||
return new ModuleClient(new transportCtor(authenticationProvider), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Creates an IoT Hub device client from the given authentication method and using the given transport type.
|
||||
* Creates an IoT Hub module client from the given authentication method and using the given transport type.
|
||||
* @param authenticationProvider Object used to obtain the authentication parameters for the IoT hub.
|
||||
* @param transportCtor Transport protocol used to connect to IoT hub.
|
||||
*/
|
||||
static fromAuthenticationProvider(authenticationProvider: AuthenticationProvider, transportCtor: any): ModuleClient {
|
||||
return InternalClient.fromAuthenticationProvider(authenticationProvider, transportCtor, ModuleClient) as ModuleClient;
|
||||
}
|
||||
|
||||
static validateEnvironment(): ReferenceError {
|
||||
// Codes_SRS_NODE_MODULE_CLIENT_13_029: [ If environment variables EdgeHubConnectionString and IotHubConnectionString do not exist then the following environment variables must be defined: IOTEDGE_WORKLOADURI, IOTEDGE_DEVICEID, IOTEDGE_MODULEID, IOTEDGE_IOTHUBHOSTNAME, IOTEDGE_AUTHSCHEME and IOTEDGE_MODULEGENERATIONID. ]
|
||||
|
||||
const keys = [
|
||||
'IOTEDGE_WORKLOADURI',
|
||||
'IOTEDGE_DEVICEID',
|
||||
'IOTEDGE_MODULEID',
|
||||
'IOTEDGE_IOTHUBHOSTNAME',
|
||||
'IOTEDGE_AUTHSCHEME',
|
||||
'IOTEDGE_MODULEGENERATIONID'
|
||||
];
|
||||
|
||||
for (const key of keys) {
|
||||
if (!process.env[key]) {
|
||||
return new ReferenceError(`Environment variable ${key} was not provided.`);
|
||||
}
|
||||
static fromAuthenticationProvider(authenticationProvider: AuthenticationProvider, transportCtor: any): any {
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_089: [The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `authenticationProvider` argument is falsy.]*/
|
||||
if (!authenticationProvider) {
|
||||
throw new ReferenceError('authenticationMethod cannot be \'' + authenticationProvider + '\'');
|
||||
}
|
||||
|
||||
// Codes_SRS_NODE_MODULE_CLIENT_13_030: [ The value for the environment variable IOTEDGE_AUTHSCHEME must be sasToken. ]
|
||||
|
||||
// we only support sas token auth scheme at this time
|
||||
if (process.env.IOTEDGE_AUTHSCHEME.toLowerCase() !== 'sastoken') {
|
||||
return new ReferenceError(
|
||||
`Authentication scheme ${
|
||||
process.env.IOTEDGE_AUTHSCHEME
|
||||
} is not a supported scheme.`
|
||||
);
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_092: [The `fromAuthenticationProvider` method shall throw a `ReferenceError` if the `transportCtor` argument is falsy.]*/
|
||||
if (!transportCtor) {
|
||||
throw new ReferenceError('transportCtor cannot be \'' + transportCtor + '\'');
|
||||
}
|
||||
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_090: [The `fromAuthenticationProvider` method shall pass the `authenticationProvider` object passed as argument to the transport constructor.]*/
|
||||
/*Codes_SRS_NODE_MODULE_CLIENT_16_091: [The `fromAuthenticationProvider` method shall return a `Client` object configured with a new instance of a transport created using the `transportCtor` argument.]*/
|
||||
return new ModuleClient(new transportCtor(authenticationProvider), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description Creates an IoT Hub module client by using configuration
|
||||
* information from the environment. If an environment
|
||||
* variable called `EdgeHubConnectionString` or `IotHubConnectionString`
|
||||
* exists, then that value is used and behavior is identical
|
||||
* to calling `fromConnectionString` passing that in. If
|
||||
* those environment variables do not exist then the following
|
||||
* variables MUST be defined:
|
||||
* IOTEDGE_WORKLOADURI - URI for iotedged's workload API
|
||||
* IOTEDGE_DEVICEID - Device identifier
|
||||
* IOTEDGE_MODULEID - Module identifier
|
||||
* IOTEDGE_MODULEGENERATIONID - Module generation identifier
|
||||
* IOTEDGE_IOTHUBHOSTNAME - IoT Hub host name
|
||||
* IOTEDGE_AUTHSCHEME - Authentication scheme to use;
|
||||
* must be "sasToken"
|
||||
* Creates an IoT Hub module client by using configuration information from the environment.
|
||||
*
|
||||
* If an environment variable called `EdgeHubConnectionString` or `IotHubConnectionString` exists, then that value is used and behavior is identical
|
||||
* to calling `fromConnectionString` passing that in. If those environment variables do not exist then the following variables MUST be defined:
|
||||
*
|
||||
* - IOTEDGE_WORKLOADURI URI for iotedged's workload API
|
||||
* - IOTEDGE_DEVICEID Device identifier
|
||||
* - IOTEDGE_MODULEID Module identifier
|
||||
* - IOTEDGE_MODULEGENERATIONID Module generation identifier
|
||||
* - IOTEDGE_IOTHUBHOSTNAME IoT Hub host name
|
||||
* - IOTEDGE_AUTHSCHEME Authentication scheme to use; must be "sasToken"
|
||||
*
|
||||
* @param transportCtor Transport protocol used to connect to IoT hub.
|
||||
* @param callback Callback to invoke when the ModuleClient has been constructured or if an
|
||||
* error occurs while creating the client.
|
||||
|
@ -353,4 +320,35 @@ export class ModuleClient extends EventEmitter {
|
|||
wrappedTransportCtor
|
||||
));
|
||||
}
|
||||
|
||||
private static validateEnvironment(): ReferenceError {
|
||||
// Codes_SRS_NODE_MODULE_CLIENT_13_029: [ If environment variables EdgeHubConnectionString and IotHubConnectionString do not exist then the following environment variables must be defined: IOTEDGE_WORKLOADURI, IOTEDGE_DEVICEID, IOTEDGE_MODULEID, IOTEDGE_IOTHUBHOSTNAME, IOTEDGE_AUTHSCHEME and IOTEDGE_MODULEGENERATIONID. ]
|
||||
const keys = [
|
||||
'IOTEDGE_WORKLOADURI',
|
||||
'IOTEDGE_DEVICEID',
|
||||
'IOTEDGE_MODULEID',
|
||||
'IOTEDGE_IOTHUBHOSTNAME',
|
||||
'IOTEDGE_AUTHSCHEME',
|
||||
'IOTEDGE_MODULEGENERATIONID'
|
||||
];
|
||||
|
||||
for (const key of keys) {
|
||||
if (!process.env[key]) {
|
||||
return new ReferenceError(
|
||||
`Environment variable ${key} was not provided.`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Codes_SRS_NODE_MODULE_CLIENT_13_030: [ The value for the environment variable IOTEDGE_AUTHSCHEME must be sasToken. ]
|
||||
|
||||
// we only support sas token auth scheme at this time
|
||||
if (process.env.IOTEDGE_AUTHSCHEME.toLowerCase() !== 'sastoken') {
|
||||
return new ReferenceError(
|
||||
`Authentication scheme ${
|
||||
process.env.IOTEDGE_AUTHSCHEME
|
||||
} is not a supported scheme.`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
var assert = require('chai').assert;
|
||||
var sinon = require('sinon');
|
||||
|
||||
var DeviceClient = require('../lib/device_client.js').Client;
|
||||
var ModuleClient = require('../lib/module_client.js').ModuleClient;
|
||||
var results = require('azure-iot-common').results;
|
||||
var Message = require('azure-iot-common').Message;
|
||||
var errors = require('azure-iot-common').errors;
|
||||
var ExponentialBackOffWithJitter = require('azure-iot-common').ExponentialBackOffWithJitter;
|
||||
|
||||
describe('DeviceClient Retry Logic', function () {
|
||||
it('retries to receive cloud-to-device message', function(testCallback) {
|
||||
var fakeTransport = new EventEmitter();
|
||||
var fakeBlobClient = { updateSharedAccessSignature: function () {} };
|
||||
sinon.spy(fakeTransport, 'on');
|
||||
fakeTransport.enableC2D = sinon.stub().callsArgWith(0, new errors.TimeoutError('failed'));
|
||||
|
||||
var client = new DeviceClient(fakeTransport, null, fakeBlobClient);
|
||||
client._maxOperationTimeout = 100;
|
||||
client.on('error', (err) => {
|
||||
assert(fakeTransport.enableC2D.callCount >= 2);
|
||||
testCallback();
|
||||
});
|
||||
client.on('message', function() {});
|
||||
});
|
||||
});
|
||||
|
||||
[DeviceClient, ModuleClient].forEach(function (ClientCtor) {
|
||||
describe(ClientCtor.name + ' Retry Logic', function () {
|
||||
[
|
||||
{
|
||||
funcName: 'sendEvent',
|
||||
funcParam: new Message('foo')
|
||||
},
|
||||
{
|
||||
funcName: 'sendEventBatch',
|
||||
funcParam: [new Message('1'), new Message('2')]
|
||||
},
|
||||
{
|
||||
funcName: 'updateSharedAccessSignature',
|
||||
funcParam: 'fakeSasToken'
|
||||
},
|
||||
{
|
||||
funcName: 'complete',
|
||||
funcParam: new Message('foo')
|
||||
},
|
||||
{
|
||||
funcName: 'reject',
|
||||
funcParam: new Message('foo')
|
||||
},
|
||||
{
|
||||
funcName: 'abandon',
|
||||
funcParam: new Message('foo')
|
||||
},
|
||||
{
|
||||
funcName: 'setOptions',
|
||||
funcParam: {}
|
||||
}
|
||||
].forEach(function (testConfig) {
|
||||
it('retries to ' + testConfig.funcName, function(testCallback) {
|
||||
var fakeTransport = new EventEmitter();
|
||||
var fakeBlobClient = { updateSharedAccessSignature: function () {} };
|
||||
fakeTransport[testConfig.funcName] = sinon.stub().callsArgWith(1, new errors.TimeoutError('failed'));
|
||||
|
||||
var client = new ClientCtor(fakeTransport, null, fakeBlobClient);
|
||||
client._maxOperationTimeout = 100;
|
||||
client[testConfig.funcName](testConfig.funcParam, function () {
|
||||
assert(fakeTransport[testConfig.funcName].callCount >= 2);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('retries to open/connect', function(testCallback) {
|
||||
var fakeTransport = new EventEmitter();
|
||||
var fakeBlobClient = { updateSharedAccessSignature: function () {} };
|
||||
fakeTransport.connect = sinon.stub().callsArgWith(0, new errors.TimeoutError('failed'));
|
||||
|
||||
var client = new ClientCtor(fakeTransport, null, fakeBlobClient);
|
||||
client._maxOperationTimeout = 100;
|
||||
client.open(function (err) {
|
||||
assert(fakeTransport.connect.callCount >= 2);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
it('retries to enable device methods', function(testCallback) {
|
||||
var fakeTransport = new EventEmitter();
|
||||
var fakeBlobClient = { updateSharedAccessSignature: function () {} };
|
||||
fakeTransport.onDeviceMethod = sinon.stub();
|
||||
fakeTransport.enableMethods = sinon.stub().callsArgWith(0, new errors.TimeoutError('failed'));
|
||||
|
||||
var client = new ClientCtor(fakeTransport, null, fakeBlobClient);
|
||||
client._maxOperationTimeout = 100;
|
||||
client.on('error', (err) => {
|
||||
assert(fakeTransport.onDeviceMethod.calledOnce);
|
||||
assert(fakeTransport.enableMethods.callCount >= 2);
|
||||
testCallback();
|
||||
});
|
||||
client.onDeviceMethod('methodName', function () {});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,116 +0,0 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
/*jshint esversion: 6 */
|
||||
|
||||
var assert = require('chai').assert;
|
||||
var sinon = require('sinon');
|
||||
var Client = require('../lib/device_client').Client;
|
||||
var ModuleClient = require('../lib/module_client').ModuleClient;
|
||||
var InternalClient = require('../lib/internal_client').InternalClient;
|
||||
var FakeTransport = require('./fake_transport');
|
||||
var Message = require('azure-iot-common').Message;
|
||||
var NoRetry = require('azure-iot-common').NoRetry;
|
||||
var errors = require('azure-iot-common').errors;
|
||||
|
||||
var fakeMethodName = '__FAKE_METHOD_NAME__';
|
||||
var fakeSignature = '__FAKE_SIG__';
|
||||
var fakeMessage = new Message('__FAKE__');
|
||||
var fakeOptions = { fake: true };
|
||||
var fakeStreamName = '__FAKE_STREAM_NAME__';
|
||||
var fakeStream = '__FAKE_STREAM__';
|
||||
var fakeStreamLength = fakeStream.length;
|
||||
var fakeOutputName = '__FAKE_OUTPUT__';
|
||||
var fakeEventName = '__FAKE_EVENT__';
|
||||
var fakeEventListener = function() {};
|
||||
|
||||
var supportedMethods = {
|
||||
Client: {
|
||||
// common
|
||||
removeListener: (instance, done) => { instance.addListener(fakeEventName, fakeEventListener); instance.removeListener(fakeEventName, fakeEventListener); setTimeout(done, 400); },
|
||||
onDeviceMethod: (instance, done) => { instance.onDeviceMethod(fakeMethodName, (request,response) => {}); done(); },
|
||||
updateSharedAccessSignature: (instance, done) => instance.updateSharedAccessSignature(fakeSignature, done),
|
||||
open: (instance, done) => instance.open(done),
|
||||
close: (instance, done) => instance.close(done),
|
||||
setOptions: (instance, done) => instance.setOptions(fakeOptions, done),
|
||||
complete: (instance, done) => instance.complete(fakeMessage, done),
|
||||
reject: (instance, done) => instance.reject(fakeMessage, done),
|
||||
abandon: (instance, done) => instance.abandon(fakeMessage, done),
|
||||
getTwin: (instance, done) => instance.getTwin(done),
|
||||
setRetryPolicy: (instance, done) => { instance.setRetryPolicy(new NoRetry()); done(); },
|
||||
// unique to Client
|
||||
uploadToBlob: (instance, done) => instance.uploadToBlob(fakeStreamName, fakeStream, fakeStreamLength, done),
|
||||
setTransportOptions: (instance, done) => instance.setTransportOptions(fakeOptions, done),
|
||||
sendEvent: (instance, done) => instance.sendEvent(fakeMessage, done),
|
||||
sendEventBatch: (instance, done) => instance.sendEventBatch([fakeMessage], done),
|
||||
},
|
||||
ModuleClient: {
|
||||
// common
|
||||
removeListener: (instance, done) => { instance.addListener(fakeEventName, fakeEventListener); instance.removeListener(fakeEventName, fakeEventListener); setTimeout(done, 400); },
|
||||
onDeviceMethod: (instance, done) => { instance.onDeviceMethod(fakeMethodName, (request,response) => {}); done(); },
|
||||
updateSharedAccessSignature: (instance, done) => instance.updateSharedAccessSignature(fakeSignature, done),
|
||||
open: (instance, done) => instance.open(done),
|
||||
close: (instance, done) => instance.close(done),
|
||||
setOptions: (instance, done) => instance.setOptions(fakeOptions, done),
|
||||
complete: (instance, done) => instance.complete(fakeMessage, done),
|
||||
reject: (instance, done) => instance.reject(fakeMessage, done),
|
||||
abandon: (instance, done) => instance.abandon(fakeMessage, done),
|
||||
getTwin: (instance, done) => instance.getTwin(done),
|
||||
setRetryPolicy: (instance, done) => { instance.setRetryPolicy(new NoRetry()); done(); },
|
||||
// unique to ModuleClient
|
||||
sendOutputEvent: (instance, done) => instance.sendOutputEvent(fakeOutputName, fakeMessage, done),
|
||||
sendOutputEventBatch: (instance, done) => instance.sendOutputEventBatch(fakeOutputName, [fakeMessage], done),
|
||||
}
|
||||
};
|
||||
|
||||
var unsupportedEvents = {
|
||||
Client: [ 'inputMessage' ],
|
||||
ModuleClient: [ 'message' ]
|
||||
};
|
||||
|
||||
[Client, ModuleClient].forEach(function(objectUnderTest) {
|
||||
describe(objectUnderTest.name, function() {
|
||||
this.timeout(1000);
|
||||
|
||||
var client;
|
||||
var fakeConnectionString = "HostName=_H_;DeviceId=_D_;SharedAccessKey=_K_";
|
||||
|
||||
beforeEach(function() {
|
||||
client = new objectUnderTest.fromConnectionString(fakeConnectionString, FakeTransport);
|
||||
if (client._internalClient.blobUploadClient) {
|
||||
client._internalClient.blobUploadClient.uploadToBlob = sinon.stub().callsArg(3);
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(function() {
|
||||
client = null;
|
||||
});
|
||||
|
||||
it ('constructor creates an InternalClient object', function() {
|
||||
assert.instanceOf(client._internalClient, InternalClient);
|
||||
});
|
||||
|
||||
var methods = supportedMethods[objectUnderTest.name];
|
||||
Object.keys(methods).forEach((fname) => {
|
||||
it(fname + ' calls InternalBase.' + fname, function(done) {
|
||||
sinon.spy(client, fname);
|
||||
sinon.spy(client._internalClient, fname);
|
||||
methods[fname](client, function() {
|
||||
assert(client._internalClient[fname].called);
|
||||
assert.deepEqual(client[fname].firstCall.args, client._internalClient[fname].firstCall.args);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
unsupportedEvents[objectUnderTest.name].forEach(function(eventName) {
|
||||
it ('on(\'' + eventName + '\' throws ArgumentError', function() {
|
||||
assert.throws(function() {
|
||||
client.emit('newListener', eventName, function() {});
|
||||
}, errors.ArgumentError);
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
});
|
|
@ -0,0 +1,257 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
|
||||
var assert = require('chai').assert;
|
||||
var sinon = require('sinon');
|
||||
var stream = require('stream');
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
var FakeTransport = require('./fake_transport.js');
|
||||
var Message = require('azure-iot-common').Message;
|
||||
var errors = require('azure-iot-common').errors;
|
||||
var results = require('azure-iot-common').results;
|
||||
var X509AuthenticationProvider = require('../lib/x509_authentication_provider').X509AuthenticationProvider;
|
||||
var Client = require('../lib/device_client').Client;
|
||||
|
||||
describe('Device Client', function () {
|
||||
var sharedKeyConnectionString = 'HostName=host;DeviceId=id;SharedAccessKey=key';
|
||||
var sharedAccessSignature = '"SharedAccessSignature sr=hubName.azure-devices.net/devices/deviceId&sig=s1gn4tur3&se=1454204843"';
|
||||
|
||||
describe('#fromConnectionString', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_05_006: [The fromConnectionString method shall return a new instance of the Client object, as by a call to new Client(new Transport(...)).]*/
|
||||
it('returns an instance of Client', function () {
|
||||
var client = Client.fromConnectionString(sharedKeyConnectionString, FakeTransport);
|
||||
assert.instanceOf(client, Client);
|
||||
});
|
||||
|
||||
it('doesn\'t try to renew the SAS token when using x509', function (testCallback) {
|
||||
this.clock = sinon.useFakeTimers();
|
||||
var clock = this.clock;
|
||||
|
||||
var x509ConnectionString = 'HostName=host;DeviceId=id;x509=true';
|
||||
var client = new Client.fromConnectionString(x509ConnectionString, FakeTransport);
|
||||
assert.instanceOf(client, Client);
|
||||
|
||||
sinon.stub(client._transport, 'updateSharedAccessSignature').callsFake(function () {
|
||||
clock.restore();
|
||||
testCallback(new Error('updateSharedAccessSignature should not have been called'));
|
||||
});
|
||||
|
||||
this.clock.tick(3600000); // 1 hour: this should trigger the call to renew the SAS token.
|
||||
clock.restore();
|
||||
testCallback();
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_093: [The `fromConnectionString` method shall create a new `X509AuthorizationProvider` object with the connection string passed as argument if it contains an X509 parameter and pass this object to the transport constructor.]*/
|
||||
it('creates a X509AuthenticationProvider and passes it to the transport', function (testCallback) {
|
||||
var x509ConnectionString = 'HostName=host;DeviceId=id;x509=true';
|
||||
Client.fromConnectionString(x509ConnectionString, function (authProvider) {
|
||||
assert.instanceOf(authProvider, X509AuthenticationProvider);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('#fromSharedAccessSignature', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_030: [The fromSharedAccessSignature method shall return a new instance of the Client object] */
|
||||
it('returns an instance of Client', function () {
|
||||
var client = Client.fromSharedAccessSignature(sharedAccessSignature, FakeTransport);
|
||||
assert.instanceOf(client, Client);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#fromAuthenticationProvider', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_091: [The `fromAuthenticationProvider` method shall return a `Client` object configured with a new instance of a transport created using the `transportCtor` argument.]*/
|
||||
it('returns an instance of Client', function () {
|
||||
var client = Client.fromAuthenticationProvider({}, FakeTransport);
|
||||
assert.instanceOf(client, Client);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
describe('#uploadToBlob', function() {
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_037: [The `uploadToBlob` method shall throw a `ReferenceError` if `blobName` is falsy.]*/
|
||||
[undefined, null, ''].forEach(function (blobName) {
|
||||
it('throws a ReferenceError if \'blobName\' is ' + blobName + '\'', function() {
|
||||
var client = new Client(new EventEmitter(), null, {});
|
||||
assert.throws(function() {
|
||||
client.uploadToBlob(blobName, new stream.Readable(), 42, function() {});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_038: [The `uploadToBlob` method shall throw a `ReferenceError` if `stream` is falsy.]*/
|
||||
[undefined, null, ''].forEach(function (stream) {
|
||||
it('throws a ReferenceError if \'stream\' is ' + stream + '\'', function() {
|
||||
var client = new Client(new EventEmitter(), null, {});
|
||||
assert.throws(function() {
|
||||
client.uploadToBlob('blobName', stream, 42, function() {});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_039: [The `uploadToBlob` method shall throw a `ReferenceError` if `streamLength` is falsy.]*/
|
||||
[undefined, null, '', 0].forEach(function (streamLength) {
|
||||
it('throws a ReferenceError if \'streamLength\' is ' + streamLength + '\'', function() {
|
||||
var client = new Client(new EventEmitter(), null, {});
|
||||
assert.throws(function() {
|
||||
client.uploadToBlob('blobName', new stream.Readable(), streamLength, function() {});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_040: [The `uploadToBlob` method shall call the `done` callback with an `Error` object if the upload fails.]*/
|
||||
it('calls the done callback with an Error object if the upload fails', function(done) {
|
||||
var FakeBlobUploader = function () {
|
||||
this.uploadToBlob = function(blobName, stream, streamLength, callback) {
|
||||
callback(new Error('fake error'));
|
||||
};
|
||||
};
|
||||
|
||||
var client = new Client(new EventEmitter(), null, new FakeBlobUploader());
|
||||
client.uploadToBlob('blobName', new stream.Readable(), 42, function(err) {
|
||||
assert.instanceOf(err, Error);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_041: [The `uploadToBlob` method shall call the `done` callback no parameters if the upload succeeds.]*/
|
||||
it('calls the done callback with no parameters if the upload succeeded', function (done) {
|
||||
var FakeBlobUploader = function () {
|
||||
this.uploadToBlob = function(blobName, stream, streamLength, callback) {
|
||||
callback();
|
||||
};
|
||||
};
|
||||
|
||||
var client = new Client(new EventEmitter(), null, new FakeBlobUploader());
|
||||
client.uploadToBlob('blobName', new stream.Readable(), 42, done);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#on(\'message\')', function () {
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_002: [The ‘message’ event shall be emitted when a cloud-to-device message is received from the IoT Hub service.]*/
|
||||
it('emits a message event when a message is received', function (done) {
|
||||
var fakeTransport = new FakeTransport();
|
||||
var client = new Client(fakeTransport);
|
||||
client.on('message', function(msg) {
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_003: [The ‘message’ event parameter shall be a ‘Message’ object.]*/
|
||||
assert.equal(msg.constructor.name, 'Message');
|
||||
done();
|
||||
});
|
||||
|
||||
fakeTransport.emit('message', new Message());
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_004: [The client shall start listening for messages from the service whenever there is a listener subscribed to the ‘message’ event.]*/
|
||||
it('starts listening for messages when a listener subscribes to the message event', function () {
|
||||
var fakeTransport = new FakeTransport();
|
||||
sinon.spy(fakeTransport, 'enableC2D');
|
||||
var client = new Client(fakeTransport);
|
||||
|
||||
// Calling 'on' twice to make sure it's called only once on the receiver.
|
||||
// It works because the test will fail if the test callback is called multiple times, and it's called for every time the 'message' event is subscribed on the receiver.
|
||||
client.on('message', function () { });
|
||||
client.on('message', function () { });
|
||||
assert.isTrue(fakeTransport.enableC2D.calledOnce);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_005: [The client shall stop listening for messages from the service whenever the last listener unsubscribes from the ‘message’ event.]*/
|
||||
it('stops listening for messages when the last listener has unsubscribed', function (testCallback) {
|
||||
var fakeTransport = new FakeTransport();
|
||||
sinon.spy(fakeTransport, 'enableC2D');
|
||||
sinon.spy(fakeTransport, 'disableC2D');
|
||||
sinon.spy(fakeTransport, 'removeAllListeners');
|
||||
|
||||
var client = new Client(fakeTransport);
|
||||
var listener1 = function () { };
|
||||
var listener2 = function () { };
|
||||
client.on('message', listener1);
|
||||
client.on('message', listener2);
|
||||
|
||||
process.nextTick(function() {
|
||||
client.removeListener('message', listener1);
|
||||
assert.isTrue(fakeTransport.disableC2D.notCalled);
|
||||
client.removeListener('message', listener2);
|
||||
assert(fakeTransport.disableC2D.calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_066: [ The client shall emit an error if connecting the transport fails while subscribing to message events ]*/
|
||||
it('emits an error if it fails to start listening for messages', function (testCallback) {
|
||||
var fakeTransport = new FakeTransport();
|
||||
var fakeError = new Error('fake');
|
||||
sinon.stub(fakeTransport, 'enableC2D').callsFake(function (callback) { callback(fakeError); });
|
||||
var client = new Client(fakeTransport);
|
||||
client.on('error', function (err) {
|
||||
assert.strictEqual(err, fakeError);
|
||||
testCallback();
|
||||
})
|
||||
|
||||
// Calling 'on' twice to make sure it's called only once on the receiver.
|
||||
// It works because the test will fail if the test callback is called multiple times, and it's called for every time the 'message' event is subscribed on the receiver.
|
||||
client.on('message', function () { });
|
||||
assert.isTrue(fakeTransport.enableC2D.calledOnce);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_066: [ The client shall emit an error if connecting the transport fails while subscribing to message events ]*/
|
||||
it('emits an error if it fails to stop listening for messages', function (testCallback) {
|
||||
var fakeTransport = new FakeTransport();
|
||||
var fakeError = new Error('fake');
|
||||
sinon.spy(fakeTransport, 'enableC2D');
|
||||
sinon.stub(fakeTransport, 'disableC2D').callsFake(function (callback) { callback(fakeError); });
|
||||
var client = new Client(fakeTransport);
|
||||
client.on('error', function (err) {
|
||||
assert.strictEqual(err, fakeError);
|
||||
testCallback();
|
||||
})
|
||||
|
||||
client.on('message', function () { });
|
||||
assert.isTrue(fakeTransport.enableC2D.calledOnce);
|
||||
client.removeAllListeners('message');
|
||||
assert.isTrue(fakeTransport.disableC2D.calledOnce);
|
||||
});
|
||||
});
|
||||
|
||||
describe('transport.on(\'disconnect\') handler', function () {
|
||||
var fakeTransport, fakeRetryPolicy;
|
||||
beforeEach(function () {
|
||||
fakeRetryPolicy = {
|
||||
shouldRetry: function () { return true; },
|
||||
nextRetryTimeout: function () { return 1; }
|
||||
};
|
||||
|
||||
fakeTransport = new EventEmitter();
|
||||
fakeTransport.enableC2D = sinon.stub().callsArg(0);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event while the client is subscribed to c2d messages updates the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method.]*/
|
||||
it('reenables C2D after being disconnected if C2D was enabled', function () {
|
||||
var client = new Client(fakeTransport);
|
||||
client.setRetryPolicy(fakeRetryPolicy);
|
||||
client.on('message', function () {});
|
||||
assert.isTrue(fakeTransport.enableC2D.calledOnce);
|
||||
fakeTransport.emit('disconnect', new errors.TimeoutError()); // timeouts can be retried
|
||||
assert.isTrue(fakeTransport.enableC2D.calledTwice);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_DEVICE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
|
||||
it('emits a disconnect event if reenabling C2D fails', function (testCallback) {
|
||||
var fakeError = new Error('fake');
|
||||
var client = new Client(fakeTransport);
|
||||
client.on('disconnect', function (err) {
|
||||
assert.instanceOf(err, results.Disconnected);
|
||||
assert.strictEqual(err.transportObj, fakeError);
|
||||
testCallback();
|
||||
});
|
||||
|
||||
client.setRetryPolicy(fakeRetryPolicy);
|
||||
client._maxOperationTimeout = 1;
|
||||
client.on('message', function () {});
|
||||
assert.isTrue(fakeTransport.enableC2D.calledOnce);
|
||||
fakeTransport.enableC2D = sinon.stub().callsArgWith(0, fakeError);
|
||||
fakeTransport.emit('disconnect', new errors.TimeoutError()); // timeouts can be retried
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,106 +0,0 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
'use strict';
|
||||
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
var assert = require('chai').assert;
|
||||
var sinon = require('sinon');
|
||||
|
||||
var InternalClient = require('../lib/internal_client.js').InternalClient;
|
||||
var results = require('azure-iot-common').results;
|
||||
var Message = require('azure-iot-common').Message;
|
||||
var errors = require('azure-iot-common').errors;
|
||||
var ExponentialBackOffWithJitter = require('azure-iot-common').ExponentialBackOffWithJitter;
|
||||
|
||||
|
||||
describe('InternalClient Retry Logic', function () {
|
||||
[
|
||||
{
|
||||
funcName: 'sendEvent',
|
||||
funcParam: new Message('foo')
|
||||
},
|
||||
{
|
||||
funcName: 'sendEventBatch',
|
||||
funcParam: [new Message('1'), new Message('2')]
|
||||
},
|
||||
{
|
||||
funcName: 'updateSharedAccessSignature',
|
||||
funcParam: 'fakeSasToken'
|
||||
},
|
||||
{
|
||||
funcName: 'complete',
|
||||
funcParam: new Message('foo')
|
||||
},
|
||||
{
|
||||
funcName: 'reject',
|
||||
funcParam: new Message('foo')
|
||||
},
|
||||
{
|
||||
funcName: 'abandon',
|
||||
funcParam: new Message('foo')
|
||||
},
|
||||
{
|
||||
funcName: 'setOptions',
|
||||
funcParam: {}
|
||||
}
|
||||
].forEach(function (testConfig) {
|
||||
it('retries to ' + testConfig.funcName, function(testCallback) {
|
||||
var fakeTransport = new EventEmitter();
|
||||
var fakeBlobClient = { updateSharedAccessSignature: function () {} };
|
||||
fakeTransport[testConfig.funcName] = sinon.stub().callsArgWith(1, new errors.TimeoutError('failed'));
|
||||
|
||||
var client = new InternalClient(fakeTransport, null, fakeBlobClient);
|
||||
client._maxOperationTimeout = 100;
|
||||
client[testConfig.funcName](new Message('foo'), function (err) {
|
||||
assert(fakeTransport[testConfig.funcName].callCount >= 2);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('retries to open/connect', function(testCallback) {
|
||||
var fakeTransport = new EventEmitter();
|
||||
var fakeBlobClient = { updateSharedAccessSignature: function () {} };
|
||||
fakeTransport.connect = sinon.stub().callsArgWith(0, new errors.TimeoutError('failed'));
|
||||
|
||||
var client = new InternalClient(fakeTransport, null, fakeBlobClient);
|
||||
client._maxOperationTimeout = 100;
|
||||
client.open(function (err) {
|
||||
assert(fakeTransport.connect.callCount >= 2);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
it('retries to enable device methods', function(testCallback) {
|
||||
var fakeTransport = new EventEmitter();
|
||||
var fakeBlobClient = { updateSharedAccessSignature: function () {} };
|
||||
fakeTransport.onDeviceMethod = sinon.stub();
|
||||
fakeTransport.enableMethods = sinon.stub().callsArgWith(0, new errors.TimeoutError('failed'));
|
||||
|
||||
var client = new InternalClient(fakeTransport, null, fakeBlobClient);
|
||||
client._maxOperationTimeout = 100;
|
||||
client.on('error', (err) => {
|
||||
assert(fakeTransport.onDeviceMethod.calledOnce);
|
||||
assert(fakeTransport.enableMethods.callCount >= 2);
|
||||
testCallback();
|
||||
});
|
||||
client.onDeviceMethod('methodName', function () {});
|
||||
});
|
||||
|
||||
|
||||
it('retries to receive cloud-to-device message', function(testCallback) {
|
||||
var fakeTransport = new EventEmitter();
|
||||
var fakeBlobClient = { updateSharedAccessSignature: function () {} };
|
||||
sinon.spy(fakeTransport, 'on');
|
||||
fakeTransport.enableC2D = sinon.stub().callsArgWith(0, new errors.TimeoutError('failed'));
|
||||
|
||||
var client = new InternalClient(fakeTransport, null, fakeBlobClient);
|
||||
client._maxOperationTimeout = 100;
|
||||
client.on('error', (err) => {
|
||||
assert(fakeTransport.enableC2D.callCount >= 2);
|
||||
testCallback();
|
||||
});
|
||||
client.on('message', function() {});
|
||||
})
|
||||
});
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -5,11 +5,44 @@
|
|||
|
||||
var assert = require('chai').assert;
|
||||
var sinon = require('sinon');
|
||||
var FakeTransport = require('./fake_transport.js');
|
||||
var Message = require('azure-iot-common').Message;
|
||||
var ModuleClient = require('../lib/module_client').ModuleClient;
|
||||
var errors = require('azure-iot-common').errors;
|
||||
var ModuleClient = require('../lib/module_client').ModuleClient;
|
||||
var IotEdgeAuthenticationProvider = require('../lib/iotedge_authentication_provider').IotEdgeAuthenticationProvider;
|
||||
var SharedAccessSignature = require('azure-iot-common').SharedAccessSignature;
|
||||
|
||||
describe('ModuleClient', function() {
|
||||
describe('ModuleClient', function () {
|
||||
var sharedKeyConnectionString = 'HostName=host;DeviceId=id;ModuleId=modId;SharedAccessKey=key';
|
||||
var sharedAccessSignature = '"SharedAccessSignature sr=hubName.azure-devices.net/devices/deviceId/modules/moduleId&sig=s1gn4tur3&se=1454204843"';
|
||||
|
||||
describe('#fromConnectionString', function () {
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_05_006: [The fromConnectionString method shall return a new instance of the ModuleClient object, as by a call to new ModuleClient(new Transport(...)).]*/
|
||||
it('returns an instance of ModuleClient', function () {
|
||||
var client = ModuleClient.fromConnectionString(sharedKeyConnectionString, FakeTransport);
|
||||
assert.instanceOf(client, ModuleClient);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
describe('#fromSharedAccessSignature', function () {
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_16_030: [The fromSharedAccessSignature method shall return a new instance of the ModuleClient object] */
|
||||
it('returns an instance of ModuleClient', function () {
|
||||
var client = ModuleClient.fromSharedAccessSignature(sharedAccessSignature, FakeTransport);
|
||||
assert.instanceOf(client, ModuleClient);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#fromAuthenticationProvider', function () {
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_16_091: [The `fromAuthenticationProvider` method shall return a `ModuleClient` object configured with a new instance of a transport created using the `transportCtor` argument.]*/
|
||||
it('returns an instance of ModuleClient', function () {
|
||||
var client = ModuleClient.fromAuthenticationProvider({}, FakeTransport);
|
||||
assert.instanceOf(client, ModuleClient);
|
||||
assert.instanceOf(client._transport, FakeTransport);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#fromEnvironment', function() {
|
||||
// Tests_SRS_NODE_MODULE_CLIENT_13_033: [ The fromEnvironment method shall throw a ReferenceError if the callback argument is falsy or is not a function. ]
|
||||
[null, undefined, 'not a function', 20].forEach(function(badCallback) {
|
||||
|
@ -58,7 +91,7 @@ describe('ModuleClient', function() {
|
|||
['EdgeHubConnectionString', 'IotHubConnectionString'].forEach(function(envName) {
|
||||
describe('sets CA cert in non-edge mode', function() {
|
||||
var stub;
|
||||
|
||||
|
||||
var transport = {
|
||||
setOptions: function() {}
|
||||
};
|
||||
|
@ -265,7 +298,7 @@ describe('ModuleClient', function() {
|
|||
|
||||
var getTrustBundleStub;
|
||||
var createWithSigningFunctionStub;
|
||||
|
||||
|
||||
beforeEach(function() {
|
||||
env.forEach(function(e) {
|
||||
process.env[e[0]] = e[1];
|
||||
|
@ -277,7 +310,7 @@ describe('ModuleClient', function() {
|
|||
createWithSigningFunctionStub = sinon.stub(SharedAccessSignature, 'createWithSigningFunction')
|
||||
.callsArgWith(3, null, 'sas token');
|
||||
});
|
||||
|
||||
|
||||
afterEach(function() {
|
||||
env.forEach(function(e) {
|
||||
delete process.env[e[0]];
|
||||
|
@ -286,7 +319,7 @@ describe('ModuleClient', function() {
|
|||
getTrustBundleStub.restore();
|
||||
createWithSigningFunctionStub.restore();
|
||||
});
|
||||
|
||||
|
||||
// Tests_SRS_NODE_MODULE_CLIENT_13_035: [ If the client is running in edge mode then the IotEdgeAuthenticationProvider.getTrustBundle method shall be invoked to retrieve the CA cert and the returned value shall be set as the CA cert for the transport via the transport's setOptions method passing in the CA value for the ca property in the options object. ]
|
||||
it('sets cert on transport', function(testCallback) {
|
||||
var transport = {
|
||||
|
@ -330,7 +363,7 @@ describe('ModuleClient', function() {
|
|||
|
||||
var getTrustBundleStub;
|
||||
var createWithSigningFunctionStub;
|
||||
|
||||
|
||||
beforeEach(function() {
|
||||
env.forEach(function(e) {
|
||||
process.env[e[0]] = e[1];
|
||||
|
@ -338,11 +371,11 @@ describe('ModuleClient', function() {
|
|||
|
||||
getTrustBundleStub = sinon.stub(IotEdgeAuthenticationProvider.prototype, 'getTrustBundle')
|
||||
.callsArgWith(0, 'whoops');
|
||||
|
||||
|
||||
createWithSigningFunctionStub = sinon.stub(SharedAccessSignature, 'createWithSigningFunction')
|
||||
.callsArgWith(3, null, 'sas token');
|
||||
});
|
||||
|
||||
|
||||
afterEach(function() {
|
||||
env.forEach(function(e) {
|
||||
delete process.env[e[0]];
|
||||
|
@ -351,7 +384,7 @@ describe('ModuleClient', function() {
|
|||
getTrustBundleStub.restore();
|
||||
createWithSigningFunctionStub.restore();
|
||||
});
|
||||
|
||||
|
||||
it('fails if getTrustBundle fails', function(testCallback) {
|
||||
var transport = {
|
||||
on: sinon.stub(),
|
||||
|
@ -368,4 +401,115 @@ describe('ModuleClient', function() {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
['sendOutputEvent', 'sendOutputEventBatch'].forEach(function(funcName) {
|
||||
describe('#' + funcName, function() {
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_18_019: [The `sendOutputEvent` method shall not throw if the `callback` is not passed. ]*/
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_18_022: [The `sendOutputEventBatch` method shall not throw if the `callback` is not passed. ]*/
|
||||
it('doesn\'t throw if no callback is given and the method exists on the transport', function() {
|
||||
var transport = new FakeTransport();
|
||||
var client = new ModuleClient(transport);
|
||||
client.open(function() {
|
||||
assert.doesNotThrow(function() {
|
||||
client[funcName]('message');
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('#on(\'inputMessage\')', function () {
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_18_012: [ The `inputMessage` event shall be emitted when an inputMessage is received from the IoT Hub service. ]*/
|
||||
it('emits a message event when a message is received', function (done) {
|
||||
var fakeTransport = new FakeTransport();
|
||||
var client = new ModuleClient(fakeTransport);
|
||||
client.on('inputMessage', function(inputName,msg) {
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_18_013: [ The `inputMessage` event parameters shall be the inputName for the message and a `Message` object. ]*/
|
||||
assert.strictEqual(inputName, 'fakeInputName');
|
||||
assert.strictEqual(msg.constructor.name, 'Message');
|
||||
done();
|
||||
});
|
||||
|
||||
fakeTransport.emit('inputMessage', 'fakeInputName', new Message());
|
||||
});
|
||||
});
|
||||
|
||||
[
|
||||
{
|
||||
eventName: 'inputMessage',
|
||||
enableFunc: 'enableInputMessages',
|
||||
disableFunc: 'disableInputMessages'
|
||||
}
|
||||
].forEach(function(testConfig) {
|
||||
describe('#on(\'' + testConfig.eventName + '\')', function () {
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_18_014: [ The client shall start listening for messages from the service whenever there is a listener subscribed to the `inputMessage` event. ]*/
|
||||
it('starts listening for messages when a listener subscribes to the message event', function () {
|
||||
var fakeTransport = new FakeTransport();
|
||||
sinon.spy(fakeTransport, testConfig.enableFunc);
|
||||
var client = new ModuleClient(fakeTransport);
|
||||
|
||||
// Calling 'on' twice to make sure it's called only once on the receiver.
|
||||
// It works because the test will fail if the test callback is called multiple times, and it's called for every time the testConfig.eventName event is subscribed on the receiver.
|
||||
client.on(testConfig.eventName, function () { });
|
||||
client.on(testConfig.eventName, function () { });
|
||||
assert.isTrue(fakeTransport[testConfig.enableFunc].calledOnce);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_18_015: [ The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `inputMessage` event. ]*/
|
||||
it('stops listening for messages when the last listener has unsubscribed', function (testCallback) {
|
||||
var fakeTransport = new FakeTransport();
|
||||
sinon.spy(fakeTransport, testConfig.enableFunc);
|
||||
sinon.spy(fakeTransport, testConfig.disableFunc);
|
||||
sinon.spy(fakeTransport, 'removeAllListeners');
|
||||
|
||||
var client = new ModuleClient(fakeTransport);
|
||||
var listener1 = function () { };
|
||||
var listener2 = function () { };
|
||||
client.on(testConfig.eventName, listener1);
|
||||
client.on(testConfig.eventName, listener2);
|
||||
|
||||
process.nextTick(function() {
|
||||
client.removeListener(testConfig.eventName, listener1);
|
||||
assert.isTrue(fakeTransport[testConfig.disableFunc].notCalled);
|
||||
client.removeListener(testConfig.eventName, listener2);
|
||||
assert(fakeTransport[testConfig.disableFunc].calledOnce);
|
||||
testCallback();
|
||||
});
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_18_017: [ The client shall emit an `error` if connecting the transport fails while subscribing to `inputMessage` events. ]*/
|
||||
it('emits an error if it fails to start listening for messages', function (testCallback) {
|
||||
var fakeTransport = new FakeTransport();
|
||||
var fakeError = new Error('fake');
|
||||
sinon.stub(fakeTransport, testConfig.enableFunc).callsFake(function (callback) { callback(fakeError); });
|
||||
var client = new ModuleClient(fakeTransport);
|
||||
client.on('error', function (err) {
|
||||
assert.strictEqual(err, fakeError);
|
||||
testCallback();
|
||||
})
|
||||
|
||||
client.on(testConfig.eventName, function () { });
|
||||
assert.isTrue(fakeTransport[testConfig.enableFunc].calledOnce);
|
||||
});
|
||||
|
||||
/*Tests_SRS_NODE_MODULE_CLIENT_16_097: [The client shall emit an `error` if connecting the transport fails while unsubscribing to `inputMessage` events.]*/
|
||||
it('emits an error if it fails to stop listening for messages', function (testCallback) {
|
||||
var fakeTransport = new FakeTransport();
|
||||
var fakeError = new Error('fake');
|
||||
sinon.spy(fakeTransport, testConfig.enableFunc);
|
||||
sinon.stub(fakeTransport, testConfig.disableFunc).callsFake(function (callback) { callback(fakeError); });
|
||||
var client = new ModuleClient(fakeTransport);
|
||||
client.on('error', function (err) {
|
||||
assert.strictEqual(err, fakeError);
|
||||
testCallback();
|
||||
})
|
||||
|
||||
client.on(testConfig.eventName, function () { });
|
||||
assert.isTrue(fakeTransport[testConfig.enableFunc].calledOnce);
|
||||
client.removeAllListeners(testConfig.eventName);
|
||||
assert.isTrue(fakeTransport[testConfig.disableFunc].calledOnce);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
import { ModuleClient, Message } from 'azure-iot-device';
|
||||
import { Mqtt } from 'azure-iot-device-mqtt';
|
||||
|
||||
const module = ModuleClient.fromConnectionString(process.env.MODULE_CONNECTION_STRING, Mqtt);
|
||||
|
||||
module.sendEvent(new Message('foo'), (err) => {
|
||||
if (err) {
|
||||
console.error(err.toString());
|
||||
} else {
|
||||
console.log('message sent');
|
||||
}
|
||||
});
|
|
@ -9,7 +9,7 @@ var async = require('async');
|
|||
|
||||
var Registry = require('azure-iothub').Registry;
|
||||
var ConnectionString = require('azure-iot-common').ConnectionString;
|
||||
var DeviceClient = require('azure-iot-device').Client;
|
||||
var ModuleClient = require('azure-iot-device').ModuleClient;
|
||||
var ServiceClient = require('azure-iothub').Client;
|
||||
var DeviceIdentityHelper = require('./device_identity_helper.js');
|
||||
|
||||
|
@ -63,7 +63,7 @@ module.exports.createModule = function(testModule, Transport, done) {
|
|||
});
|
||||
},
|
||||
function connectDeviceClient(done) {
|
||||
testModule.deviceClient = DeviceClient.fromConnectionString(testModule.moduleConnectionString, Transport);
|
||||
testModule.deviceClient = ModuleClient.fromConnectionString(testModule.moduleConnectionString, Transport);
|
||||
debug('opening device client');
|
||||
testModule.deviceClient.open(function(err) {
|
||||
debug('deviceClient.open returned ' + (err ? err : 'success'));
|
||||
|
|
Загрузка…
Ссылка в новой задаче