зеркало из
1
0
Форкнуть 0

Merge remote-tracking branch 'upstream/master' into bertk-merge

This commit is contained in:
Bert Kleewein 2018-04-13 14:42:39 -07:00
Родитель 8b4ab99a95 7650afee6f
Коммит ff08bf1f9a
10 изменённых файлов: 488 добавлений и 245 удалений

5
.gitignore поставляемый
Просмотреть файл

@ -231,4 +231,7 @@ ts-e2e/lib
longhaultests/lib
# Build artifacts
build/build_parallel/temp
build/build_parallel/temp
# directory with dev-setup symlinks
npm-symlinks/

Просмотреть файл

@ -8,6 +8,9 @@ set node-root=%~dp0..
REM // resolve to fully qualified path
for %%i in ("%node-root%") do set node-root=%%~fi
set NPM_CONFIG_PREFIX=%node-root%\npm-symlinks
if NOT exist %NPM_CONFIG_PREFIX% mkdir %NPM_CONFIG_PREFIX%
echo.
echo -- Setting up build_parallel tool --
cd %node-root%\build\build_parallel

Просмотреть файл

@ -5,6 +5,11 @@
node_root=$(cd "$(dirname "$0")/.." && pwd)
export NPM_CONFIG_PREFIX=$node_root/npm-symlinks
if [ ! -d $NPM_CONFIG_PREFIX ]; then
mkdir $NPM_CONFIG_PREFIX
fi
cleanup_and_exit()
{
exit $1

Просмотреть файл

@ -8,6 +8,19 @@ set node-root=%~dp0..
REM // resolve to fully qualified path
for %%i in ("%node-root%") do set node-root=%%~fi
if "%1" == "" (
echo Not removing any files because symlinks are in %node-root%\npm-symlinks and teardown isn't necessary.
echo If you want to tear down anyway, run dev-teardown.cmd with the --force flag
exit /b 0
)
if "%1" NEQ "--force" (
echo usage: dev-teardown.cmd [--force]
exit /b 1
)
set NPM_CONFIG_PREFIX=%node-root%\npm-symlinks
echo -- Removing links for build tools --
cd %node-root%\build\tools
echo .

Просмотреть файл

@ -5,6 +5,19 @@
node_root=$(cd "$(dirname "$0")/.." && pwd)
if [ -z "$1" ]; then
echo "Not removing any files because symlinks are in $node_root\npm-symlinks and teardown isn't necessary."
echo "If you want to tear down anyway, run dev-teardown.cmd with the --force flag"
exit 0
fi
if [ "$1" != "--force" ]; then
echo "usage: dev-teardown.cmd [--force]"
exit 1
fi
export NPM_CONFIG_PREFIX=$node_root/npm-symlinks
echo "\n-- Removing links for build tools --"
cd $node_root/build/tools
npm rm azure-iothub

Просмотреть файл

@ -116,14 +116,30 @@ protocolAndTermination.forEach( function (testConfiguration) {
var provisionedDevice;
before(function (beforeCallback) {
debug('creating test device...');
DeviceIdentityHelper.createDeviceWithSas(function (err, testDeviceInfo) {
provisionedDevice = testDeviceInfo;
beforeCallback(err);
if (err) {
debug('failed to create test device: ' + err.toString());
beforeCallback(err);
} else {
debug('test device created: ' + testDeviceInfo.deviceId);
provisionedDevice = testDeviceInfo;
beforeCallback(err);
}
});
});
after(function (afterCallback) {
DeviceIdentityHelper.deleteDevice(provisionedDevice.deviceId, afterCallback);
debug('deleting device: ' + provisionedDevice.deviceId);
DeviceIdentityHelper.deleteDevice(provisionedDevice.deviceId, function (err) {
if (err) {
debug('failed to delete test device: ' + err.toString());
afterCallback(err);
} else {
debug('test device deleted');
afterCallback();
}
});
});
beforeEach(function () {
@ -133,8 +149,17 @@ protocolAndTermination.forEach( function (testConfiguration) {
});
afterEach(function (testCallback) {
closeDeviceServiceClients(deviceClient, serviceClient, testCallback);
if (sendMessageTimeout !== null) clearTimeout(sendMessageTimeout);
debug('closing device and service clients...');
closeDeviceServiceClients(deviceClient, serviceClient, function (err) {
if (err) {
debug('failed to close clients: ' + err.toString());
testCallback(err);
} else {
debug('device and service clients closed');
testCallback();
}
});
});
doConnectTest(testConfiguration.testEnabled)('Service sends a C2D message, device receives it, and' + testConfiguration.closeReason + 'which is noted by the iot hub client', function (testCallback) {
@ -206,46 +231,69 @@ protocolAndTermination.forEach( function (testConfiguration) {
});
doConnectTest(testConfiguration.testEnabled)('Service sends ' + numberOfC2DMessages + ' C2D messages, device receives first and' + testConfiguration.closeReason + 'which is never seen by the iot hub client', function (testCallback) {
var originalMessages = [];
var messagesReceived = 0;
var messagesSent = 0;
var c2dMessageSender = function() {
if (messagesSent >= numberOfC2DMessages) {
testCallback(new Error('tried to send to many messages'));
} else {
serviceClient.send(provisionedDevice.deviceId, originalMessages[messagesSent++], function (sendErr) {
debug('At service client send callback - error is: ' + sendErr);
if (sendErr) {
testCallback(sendErr);
}
});
}
};
var findMessage = function(incomingMessage, storedMessages) {
for (var j = 0; j < storedMessages.length; j++) {
if (incomingMessage.messageId === storedMessages[j].messageId) {
if (!storedMessages[j].alreadyReceived) {
storedMessages.alreadyReceived = true;
return true;
} else {
testCallback(new Error('received a message more than once'));
var originalMessages = {};
var faultInjected = false;
for (var i = 0; i < numberOfC2DMessages; i++) {
var uuidData = uuid.v4();
originalMessages[uuidData] = {
message: new Message(uuidData),
sent: false,
received: false
};
originalMessages[uuidData].message.messageId = uuidData;
originalMessages[uuidData].message.expiryTimeUtc = Date.now() + 60000;
}
var allDone = function () {
for (var messageId in originalMessages) {
if (originalMessages.hasOwnProperty(messageId)) {
debug('message ' + messageId + ': sent: ' + originalMessages[messageId].sent + '; received: ' + originalMessages[messageId].received);
if (!originalMessages[messageId].sent || !originalMessages[messageId].received) {
return false;
}
}
}
return false;
debug('allDone: true!');
return true;
};
for (var i = 0; i < numberOfC2DMessages; i++) {
var uuidData = uuid.v4();
originalMessages[i] = new Message(uuidData);
originalMessages[i].messageId = uuidData;
originalMessages[i].expiryTimeUtc = Date.now() + 60000; // Expire 60s from now, to reduce the chance of us hitting the 50-message limit on the IoT Hub
originalMessages[i].alreadyReceived = false; // Yes a bit tacky, but it really shouldn't even make it to the hub, and even if it did it wouldn't matter.
}
var sendMessage = function (messageId) {
serviceClient.send(provisionedDevice.deviceId, originalMessages[messageId].message, function (sendErr) {
if (sendErr) {
debug('failed to send message with id: ' + messageId + ': ' + sendErr.toString());
testCallback(sendErr);
} else {
debug('message sent: ' + messageId);
originalMessages[messageId].sent = true;
if (allDone()) {
debug('all messages have been sent and received!');
testCallback();
} else {
debug('still not done!');
}
}
});
};
var sendNextMessage = function() {
for (var messageId in originalMessages) {
if (originalMessages[messageId].sent) {
continue;
} else {
debug('Sending message with id: ' + messageId);
return sendMessage(messageId);
}
}
debug('all messages have been sent.');
};
debug('connecting device client...');
deviceClient.open(function (openErr) {
debug('device has opened.');
if (openErr) {
debug('error connecting device client: ' + openErr.toString());
testCallback(openErr);
} else {
debug('device client connected');
deviceClient.on('disconnect', function () {
debug('got an unexpected disconnect - this test should never see one!');
testCallback(new Error('unexpected disconnect'));
@ -257,28 +305,35 @@ protocolAndTermination.forEach( function (testConfiguration) {
debug('c2d message received with id: ' + receivedMessage.messageId);
deviceClient.complete(receivedMessage, function (err, result) {
if (err) {
debug('error while settling (accept) the message: ' + err.toString());
testCallback(err);
} else {
assert.equal(result.constructor.name, 'MessageCompleted');
debug('message completed');
//
// Make sure that the message we are looking at is one of the messages that we just sent.
//
if (findMessage(receivedMessage, originalMessages)) {
if (messagesReceived++ === 0) {
if (originalMessages[receivedMessage.messageId]) {
originalMessages[receivedMessage.messageId].received = true;
if (!faultInjected) {
var terminateMessage = new Message('');
terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType);
terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason);
terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds);
debug('Injecting fault: ' + testConfiguration.operationType);
faultInjected = true;
deviceClient.sendEvent(terminateMessage, function (sendErr) {
debug('at the callback for the fault injection send, err is:' + sendErr);
});
sendMessageTimeout = setTimeout(c2dMessageSender.bind(this), 5000);
}
if (allDone()) {
debug('all messages have been received. test successful');
testCallback();
} else {
if (messagesReceived === numberOfC2DMessages) {
testCallback();
} else {
sendMessageTimeout = setTimeout(c2dMessageSender.bind(this), 3000);
}
debug('scheduling next message in 3 seconds');
sendMessageTimeout = setTimeout(sendNextMessage, 3000);
}
} else {
debug('received an unanticipated message, id: ' + receivedMessage.messageId + ' data: ' + receivedMessage.data.toString());
@ -286,13 +341,15 @@ protocolAndTermination.forEach( function (testConfiguration) {
}
});
});
debug('about to open the service client');
debug('connecting service client...');
serviceClient.open(function (serviceErr) {
debug('At service client open callback - error is:' + serviceErr);
if (serviceErr) {
debug('Failed to connect servic client:' + serviceErr.toString());
testCallback(serviceErr);
} else {
c2dMessageSender();
debug('service client connected');
debug('sending first message');
sendNextMessage();
}
});
}

Просмотреть файл

@ -128,9 +128,18 @@ protocolAndTermination.forEach( function (testConfiguration) {
ehReceivers = [];
});
afterEach(function (testCallback) {
closeDeviceEventHubClients(deviceClient, ehClient, ehReceivers, testCallback);
afterEach(function (afterEachCallback) {
if (sendMessageTimeout !== null) clearTimeout(sendMessageTimeout);
debug('closing device and event hubs clients...');
closeDeviceEventHubClients(deviceClient, ehClient, ehReceivers, function (err) {
if (err) {
debug('error closing clients: ' + err.toString());
afterEachCallback(err);
} else {
debug('device and event hubs client closed.');
afterEachCallback();
}
});
});
doConnectTest(testConfiguration.testEnabled)('device sends a message, event hub client receives it, and' + testConfiguration.closeReason + 'which is noted by the iot hub device client', function (testCallback) {
@ -197,48 +206,68 @@ protocolAndTermination.forEach( function (testConfiguration) {
});
doConnectTest(testConfiguration.testEnabled)('device sends ' + numberOfD2CMessages + ' messages, when event hub client receives first, it ' + testConfiguration.closeReason + 'which is not seen by the iot hub device client', function (testCallback) {
var originalMessages = [];
var messagesReceived = 0;
var messagesSent = 0;
var d2cMessageSender = function() {
debug('Sending message number: ' + (messagesSent + 1));
if (messagesSent >= numberOfD2CMessages) {
testCallback(new Error('tried to send to many messages'));
} else {
deviceClient.sendEvent(originalMessages[messagesSent++], function (sendErr) {
debug('At device client send callback - error is: ' + sendErr);
if (sendErr) {
testCallback(sendErr);
}
});
}
};
var findMessage = function(incomingMessage, storedMessages) {
if (incomingMessage.properties && incomingMessage.properties.messageId) {
for (var j = 0; j < storedMessages.length; j++) {
if (incomingMessage.properties.messageId === storedMessages[j].messageId) {
if (!storedMessages[j].alreadyReceived) {
storedMessages.alreadyReceived = true;
return true;
} else {
testCallback(new Error('received a message more than once'));
}
var originalMessages = {};
var faultInjected = false;
for (var i = 0; i < numberOfD2CMessages; i++) {
var uuidData = uuid.v4();
originalMessages[uuidData] = {
message: new Message(uuidData),
sent: false,
received: false
};
}
var allDone = function () {
for (var messageId in originalMessages) {
if (originalMessages.hasOwnProperty(messageId)) {
debug('message ' + messageId + ': sent: ' + originalMessages[messageId].sent + '; received: ' + originalMessages[messageId].received);
if (!originalMessages[messageId].sent || !originalMessages[messageId].received) {
return false;
}
}
}
return false;
debug('allDone: true!');
return true;
};
for (var i = 0; i < numberOfD2CMessages; i++) {
var uuidData = uuid.v4();
originalMessages[i] = new Message(uuidData);
originalMessages[i].messageId = uuidData;
originalMessages[i].alreadyReceived = false;
}
var sendMessage = function (messageId) {
deviceClient.sendEvent(originalMessages[messageId].message, function (sendErr) {
if (sendErr) {
debug('failed to send message with id: ' + messageId + ': ' + sendErr.toString());
testCallback(sendErr);
} else {
debug('message sent: ' + messageId);
originalMessages[messageId].sent = true;
if (allDone()) {
debug('all messages have been sent and received!');
testCallback();
} else {
debug('still not done!');
}
}
});
};
var sendNextMessage = function() {
for (var messageId in originalMessages) {
if (originalMessages[messageId].sent) {
continue;
} else {
debug('Sending message with id: ' + messageId);
return sendMessage(messageId);
}
}
debug('all messages have been sent.');
};
var startAfterTime = Date.now() - 5000;
debug('starting to listen to messages received since: ' + new Date(startAfterTime).toISOString());
ehClient.open()
.then(ehClient.getPartitionIds.bind(ehClient))
.then(function (partitionIds) {
return partitionIds.map(function (partitionId) {
return ehClient.createReceiver('$Default', partitionId, { 'startAfterTime' : Date.now() }).then(function (receiver) {
return ehClient.createReceiver('$Default', partitionId, { 'startAfterTime' : startAfterTime }).then(function (receiver) {
ehReceivers.push(receiver);
receiver.on('errorReceived', function(err) {
testCallback(err);
@ -246,22 +275,26 @@ protocolAndTermination.forEach( function (testConfiguration) {
receiver.on('message', function (eventData) {
if (eventData.annotations['iothub-connection-device-id'] === provisionedDevice.deviceId) {
debug('did get a message for this device.');
if (findMessage(eventData, originalMessages)) {
debug('It was one of the messages we sent.');
if (messagesReceived++ === 0) {
debug('It was the first message.');
var receivedMessageId = eventData.body.toString();
if (originalMessages[receivedMessageId]) {
debug('It was one of the messages we sent: ' + receivedMessageId);
originalMessages[receivedMessageId].received = true;
if (!faultInjected) {
debug('Fault has not been injected yet. Failing now...');
var terminateMessage = new Message('');
terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType);
terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason);
terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds);
faultInjected = true;
deviceClient.sendEvent(terminateMessage, function (sendErr) {
debug('at the callback for the fault injection send, err is:' + sendErr);
});
}
if (messagesReceived === numberOfD2CMessages) {
if (allDone()) {
testCallback();
} else {
sendMessageTimeout = setTimeout(d2cMessageSender.bind(this), 5000);
sendMessageTimeout = setTimeout(sendNextMessage, 3000);
}
} else {
debug('eventData message id doesn\'t match any stored message id');
@ -281,7 +314,7 @@ protocolAndTermination.forEach( function (testConfiguration) {
deviceClient.on('disconnect', function () {
testCallback(new Error('unexpected disconnect'));
});
d2cMessageSender();
sendNextMessage();
}
});
})

Просмотреть файл

@ -5,7 +5,7 @@
var Registry = require('azure-iothub').Registry;
var ConnectionString = require('azure-iothub').ConnectionString;
var debug = require('debug')('e2etests:twindisconnect');
var debug = require('debug')('e2etests:twin_disconnect');
var Message = require('azure-iot-common').Message;
var deviceSdk = require('azure-iot-device');
var deviceMqtt = require('azure-iot-device-mqtt');
@ -196,11 +196,14 @@ protocolAndTermination.forEach( function (testConfiguration) {
});
assert.equal(deviceTwin.properties.desired.$version, 1);
deviceTwin.on('properties.desired', function() {
if (deviceTwin.properties.desired.$version === 2) {
if (deviceTwin.properties.desired.$version === 1) {
debug('received notification for desired property v1. nothing to do');
} else if (deviceTwin.properties.desired.$version === 2) {
var terminateMessage = new Message(' ');
terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType);
terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason);
terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds);
debug('sending fault injection message');
deviceClient.sendEvent(terminateMessage, function (sendErr) {
debug('at the callback for the fault injection send, err is:' + sendErr);
});
@ -208,9 +211,15 @@ protocolAndTermination.forEach( function (testConfiguration) {
testCallback(new Error('incorrect property version received - ' + deviceTwin.properties.desired.$version));
}
});
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
if (err) return testCallback(err);
});
// giving a few seconds for the twin subscription to happen before we send the update.
setTimeout(function () {
debug('Updating twin properties');
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
debug('twin properties updated. version should now be 2');
if (err) return testCallback(err);
});
}, 3000);
});
doConnectTest(testConfiguration.testEnabled)('Simple twin update: device receives it, and' + testConfiguration.closeReason + 'which is NOT noted by the iot hub client', function(testCallback) {
@ -229,27 +238,34 @@ protocolAndTermination.forEach( function (testConfiguration) {
assert.equal(deviceTwin.properties.desired.$version,1);
deviceTwin.on('properties.desired', function() {
if (deviceTwin.properties.desired.$version === 1) {
// ignore $update === 1. assert needed to make jshint happy
assert(true);
debug('received notification for desired property v1. nothing to do');
} else if (deviceTwin.properties.desired.$version === 2) {
var terminateMessage = new Message(' ');
terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType);
terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason);
terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds);
debug('sending fault injection message');
deviceClient.sendEvent(terminateMessage, function (sendErr) {
debug('at the callback for the fault injection send, err is:' + sendErr);
});
setTwinMoreNewPropsTimeout = setTimeout(setTwinMoreNewProps, (testConfiguration.delayInSeconds + 5) * 1000);
} else if (deviceTwin.properties.desired.$version === 3) {
debug('received notification for desired property v3. test is successful.');
testCallback();
} else {
debug('incorrect property version received. exiting test with an error.');
testCallback(new Error('incorrect property version received - ' + deviceTwin.properties.desired.$version));
}
});
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
if (err) return testCallback(err);
});
// giving a few seconds for the twin subscription to happen before we send the update.
setTimeout(function () {
debug('Updating twin properties');
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
debug('twin properties updated. version should now be 2');
if (err) return testCallback(err);
});
}, 3000);
});
});
});

Просмотреть файл

@ -12,6 +12,7 @@ var uuid = require('uuid');
var _ = require('lodash');
var assert = require('chai').assert;
var async = require('async');
var debug = require('debug')('e2etests:twin_e2e');
var deviceAmqp = require('azure-iot-device-amqp');
var deviceMqtt = require('azure-iot-device-mqtt');
@ -51,13 +52,11 @@ delete nullMergeResult.tweedle;
].forEach(function(protocolCtor) {
describe('Twin over ' + protocolCtor.name, function() {
this.timeout(60000);
var deviceClient, deviceTwin;
var serviceTwin;
var deviceDescription, deviceClient, deviceTwin, serviceTwin;
var host = ConnectionString.parse(hubConnectionString).HostName;
var registry = Registry.fromConnectionString(hubConnectionString);
var deviceDescription;
beforeEach(function (done) {
var host = ConnectionString.parse(hubConnectionString).HostName;
before(function (done) {
var pkey = new Buffer(uuid.v4()).toString('base64');
var deviceId = '0000e2etest-delete-me-twin-e2e-' + protocolCtor.name + '-' + uuid.v4();
@ -73,42 +72,81 @@ delete nullMergeResult.tweedle;
connectionString: 'HostName=' + host + ';DeviceId=' + deviceId + ';SharedAccessKey=' + pkey
};
var registry = Registry.fromConnectionString(hubConnectionString);
debug('creating test device: ' + deviceId + ' on hub: ' + host);
registry.create(deviceDescription, function (err) {
if (err) return done(err);
if (err) {
debug('error creating test device: ' + err.toString());
return done(err);
} else {
debug('test device successfully created: ' + deviceId);
return done();
}
});
});
var sas = deviceSas.create(host, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString();
deviceClient = deviceSdk.Client.fromSharedAccessSignature(sas, protocolCtor);
after(function (done) {
debug('deleting test device: ' + deviceDescription.deviceId);
registry.delete(deviceDescription.deviceId, function(err) {
if (err) {
debug('Error deleting test device: ' + err.toString());
return done(err);
} else {
debug('test device deleted: ' + deviceDescription.deviceId);
return done();
}
});
});
deviceClient.open(function(err) {
if (err) return done(err);
beforeEach(function (done) {
var sas = deviceSas.create(host, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString();
deviceClient = deviceSdk.Client.fromSharedAccessSignature(sas, protocolCtor);
debug('device client connecting...');
deviceClient.open(function(err) {
if (err) {
debug('error connecting the device client: ' + err.toString());
return done(err);
} else {
debug('device client connected - getting twin');
deviceClient.getTwin(function(err, twin) {
if (err) return done(err);
deviceTwin = twin;
if (err) {
debug('error getting device twin: ' + err.toString());
return done(err);
} else {
debug('device got its twin');
deviceTwin = twin;
registry.getTwin(deviceDescription.deviceId, function(err, twin) {
if (err) return done(err);
serviceTwin = twin;
done();
});
debug('service getting device twin...');
registry.getTwin(deviceDescription.deviceId, function(err, twin) {
if (err) {
debug('error getting the device twin on the service side: ' + err.toString());
return done(err);
} else {
debug('got device twin on the service side');
serviceTwin = twin;
done();
}
});
}
});
});
}
});
});
afterEach(function (done) {
if (deviceClient) {
debug('closing device client');
deviceClient.close(function(err) {
if (err) return done(err);
var registry = Registry.fromConnectionString(hubConnectionString);
registry.delete(deviceDescription.deviceId, function(err) {
if (err) return done(err);
if (err) {
debug('error closing the device client: ' + err.toString());
return done(err);
} else {
debug('device client closed');
done();
});
}
});
} else {
debug('no device client. continuing...');
done();
}
});
@ -134,114 +172,211 @@ delete nullMergeResult.tweedle;
};
it('relies on $version starting at 1 and incrementing by 1 each time', function(done) {
assert.equal(deviceTwin.properties.desired.$version,1);
assert.equal(deviceTwin.properties.desired.$version, 1);
deviceTwin.on('properties.desired', function() {
debug('desired property update with version: ' + deviceTwin.properties.desired.$version);
if (deviceTwin.properties.desired.$version === 1) {
// ignore $update === 1. assert needed to make jshint happy
assert(true);
debug('initial property update received');
debug('updating desired properties using the service API');
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
if (err) {
debug('error sending the desired properties update:' + err.toString());
return done(err);
} else {
debug('desired properties update sent');
}
});
} else if (deviceTwin.properties.desired.$version === 2) {
debug('desired property update received, version is 2. test successful');
done();
} else {
debug('incorrect property version received: ' + deviceTwin.properties.desired.$version);
done(new Error('incorrect property version received - ' + deviceTwin.properties.desired.$version));
}
});
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
if (err) return done(err);
});
});
var sendsAndReceiveReportedProperties = function(done) {
assertObjectIsEmpty(serviceTwin.properties.reported);
assertObjectIsEmpty(deviceTwin.properties.reported);
debug('updating reported properties');
deviceTwin.properties.reported.update(newProps, function(err) {
if (err) return done(err);
serviceTwin.get(function(err) {
if (err) return done(err);
assertObjectsAreEqual(newProps, serviceTwin.properties.reported);
done();
});
if (err) {
debug('error updating reported properties: ' + err.toString());
return done(err);
} else {
debug('reported properties updated');
setTimeout(function () {
debug('getting twin from the service side');
serviceTwin.get(function(err) {
if (err) {
debug('error getting twin on the service side');
return done(err);
} else {
debug('got twin from the service side');
assertObjectsAreEqual(newProps, serviceTwin.properties.reported);
done();
}
});
}, 3000);
}
});
};
it('sends and receives reported properties', sendsAndReceiveReportedProperties);
var mergeReportedProperties = function(first, second, result, done) {
debug('sending first reported properties update');
deviceTwin.properties.reported.update(first, function(err) {
if (err) return done(err);
deviceTwin.properties.reported.update(second, function(err) {
if (err) return done(err);
serviceTwin.get(function(err) {
if (err) return done(err);
assertObjectsAreEqual(serviceTwin.properties.reported, result);
done();
if (err) {
debug('error updating reported properties: ' + err.toString());
return done(err);
} else {
debug('first reported property update sent.');
debug('sending second reported property update');
deviceTwin.properties.reported.update(second, function(err) {
if (err) {
debug('error sending second reported property update: ' + err.toString());
return done(err);
} else {
setTimeout(function () {
serviceTwin.get(function(err) {
if (err) return done(err);
assertObjectsAreEqual(serviceTwin.properties.reported, result);
done();
});
}, 3000);
}
});
});
}
});
};
it('sends and receives merged reported properties', function(done) {
it('device sends reported properties and the service gets them', sendsAndReceiveReportedProperties);
it('device sends more reported properties, they are merged, and the service gets them', function(done) {
mergeReportedProperties(newProps, moreNewProps, mergeResult, done);
});
var sendsAndReceivesDesiredProperties = function(done) {
assertObjectIsEmpty(deviceTwin.properties.desired);
var initialPropertyVersion = deviceTwin.properties.desired.$version;
var updatedPropertyVersion = initialPropertyVersion + 1;
deviceTwin.on('properties.desired', function(props) {
if (props.$version === 1) {
// ignore
assert(true);
} else if (props.$version === 2) {
debug('desired properties update received with version: ' + props.$version);
if (props.$version === initialPropertyVersion) {
debug('initial property update: ignoring');
// wait a little before triggering the properties update to account for subscription time.
setTimeout(function () {
debug('sending desired properties update');
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
if (err) {
debug('error sending desired properties update: ' + err.toString());
return done(err);
} else {
debug('desired properties update sent');
}
});
}, 3000);
} else if (props.$version === updatedPropertyVersion) {
debug('updated properties received');
assertObjectsAreEqual(newProps, deviceTwin.properties.desired);
done();
} else {
debug('unexpected property version! test failure.');
done(new Error('incorrect property version received - ' + props.$version));
}
});
serviceTwin.update( { properties : { desired : newProps } }, function(err) {
if (err) return done(err);
});
};
it('sends and receives desired properties', sendsAndReceivesDesiredProperties);
var mergeDesiredProperties = function(first, second, newEtag, result, done) {
var initialPropertyVersion = deviceTwin.properties.desired.$version;
var firstUpdateVersion = initialPropertyVersion + 1;
var secondUpdateVersion = firstUpdateVersion + 1;
debug('initial property version: ' + initialPropertyVersion);
debug('first update version: ' + firstUpdateVersion);
debug('second update version: ' + secondUpdateVersion);
deviceTwin.on('properties.desired', function(props) {
if (props.$version === 1 || props.$version === 2) {
// ignore
assert(true);
} else if (props.$version === 3) {
debug('property update with $version: ' + props.$version);
if (props.$version === initialPropertyVersion) {
debug('received initial properties');
setTimeout(function () {
debug('sending first desired properties update');
serviceTwin.update( { properties : { desired : first } }, function(err) {
if (err) {
debug('failed to send the first desired properties update');
return done(err);
} else {
debug('first desired properties update successful');
}
});
}, 3000);
} else if (props.$version === firstUpdateVersion) {
setTimeout(function () {
if (newEtag) {
debug('setting serviceTwin etag: ' + newEtag);
assert.isDefined(serviceTwin.etag);
assert.notEqual(serviceTwin.etag, "*");
serviceTwin.etag = newEtag;
}
debug('sending second desired properties update');
serviceTwin.update( { properties : { desired : second } }, function(err) {
if (err) {
debug('failed to send the second desired properties update.');
return done(err);
} else {
debug('service successfully updated the desired properties');
}
});
}, 3000);
} else if (props.$version >= secondUpdateVersion) {
debug('second update received. asserting equality');
assertObjectsAreEqual(deviceTwin.properties.desired, result);
debug('desired properties received. test successful');
done();
} else {
debug('unexpected property version received: ' + props.$version + ' ; test failure!');
done(new Error('incorrect property version received - ' + props.$version));
}
});
serviceTwin.update( { properties : { desired : first } }, function(err) {
if (err) return done(err);
if (newEtag) {
assert.isDefined(serviceTwin.etag);
assert.notEqual(serviceTwin.etag, "*");
serviceTwin.etag = newEtag;
}
serviceTwin.update( { properties : { desired : second } }, function(err) {
if (err) return done(err);
});
});
};
it('sends and receives merged desired properties', function(done) {
it('service sends desired properties and device receives them', sendsAndReceivesDesiredProperties);
it('service sends new desired properties, they are merged and the device receives them', function(done) {
mergeDesiredProperties(newProps, moreNewProps, null, mergeResult, done);
});
it('sends and receives merged desired properties using etag *', function(done) {
it('service sends desired properties using etag *, they are merged and the device receives them', function(done) {
mergeDesiredProperties(newProps, moreNewProps, "*", mergeResult, done);
});
it('can get and set tags', function(done) {
var mergeTags = function(first, second, newEtag, result, done) {
debug('sending first tag update..');
serviceTwin.update( { tags : first }, function(err) {
if (err) {
debug('error updating tags: ' + err.toString());
return done(err);
} else {
debug('first twin update sent');
if (newEtag) {
assert.isDefined(serviceTwin.etag);
assert.notEqual(serviceTwin.etag, "*");
debug('setting twin etag to: ' + newEtag);
serviceTwin.etag = newEtag;
}
debug('sending second tag update...');
serviceTwin.update( { tags: second }, function(err) {
if (err) {
debug('error sending second tag update: ' + err.toString());
return done(err);
} else {
debug('second tag update sent');
assertObjectsAreEqual(serviceTwin.tags, result);
done();
}
});
}
});
};
it('service can get and set tags', function(done) {
assertObjectIsEmpty(serviceTwin.tags);
serviceTwin.update( { tags : newProps }, function(err) {
@ -252,54 +387,40 @@ delete nullMergeResult.tweedle;
});
});
var mergeTags = function(first, second, newEtag, result, done) {
assertObjectIsEmpty(serviceTwin.tags);
serviceTwin.update( { tags : first }, function(err) {
if (err) return done(err);
if (newEtag) {
assert.isDefined(serviceTwin.etag);
assert.notEqual(serviceTwin.etag, "*");
serviceTwin.etag = newEtag;
}
serviceTwin.update( { tags: second }, function(err) {
if (err) return done(err);
assertObjectsAreEqual(serviceTwin.tags, result);
done();
});
});
};
it('can merge tags', function(done) {
it('service can merge new tags', function(done) {
mergeTags(newProps, moreNewProps, null, mergeResult, done);
});
it('can merge tags using etag *', function(done) {
it('service can merge new tags using etag *', function(done) {
mergeTags(newProps, moreNewProps, "*", mergeResult, done);
});
it.skip('can send reported properties to the service after renewing the sas token', function(done) {
deviceClient.on('_sharedAccessSignatureUpdated', function() {
// _sharedAccessSignatureUpdated fired when the signature has been updated,
// but we still have to wait for the library to connect and register for events.
// We really need a "twinReady" event, but we don't have one right now.
setTimeout(function() {
it('can send reported properties to the service after renewing the sas token', function(done) {
var newSas = deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString();
debug('updating the shared access signature for device: ' + deviceDescription.deviceId);
deviceClient.updateSharedAccessSignature(newSas, function (err) {
if (err) {
debug('error renewing the shared access signature: ' + err.toString());
done(err);
} else {
debug('updating reported properties');
sendsAndReceiveReportedProperties(done);
}, 1000);
}
});
deviceClient.updateSharedAccessSignature(deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString());
});
it.skip('can receive desired properties from the service after renewing the sas token', function(done) {
deviceClient.on('_sharedAccessSignatureUpdated', function() {
// See note above about "twinReady" event.
setTimeout(function() {
it('can receive desired properties from the service after renewing the sas token', function(done) {
var newSas = deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString();
debug('updating the shared access signature for device: ' + deviceDescription.deviceId);
deviceClient.updateSharedAccessSignature(newSas, function (err) {
if (err) {
debug('error renewing the shared access signature: ' + err.toString());
done(err);
} else {
debug('updating desired properties');
sendsAndReceivesDesiredProperties(done);
}, 1000);
}
});
deviceClient.updateSharedAccessSignature(deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString());
});
it.skip('call null out all reported properties', function(done) {
@ -342,27 +463,6 @@ delete nullMergeResult.tweedle;
mergeTags(newProps, nullIndividualProps, "*", nullMergeResult, done);
});
it('can renew SAS 20 times without failure', function(done)
{
this.timeout(180000);
var iteration = 0;
var doItAgain = function() {
iteration++;
if (iteration === 20) {
done();
} else {
deviceClient.updateSharedAccessSignature(deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString());
// at this point, signature renewal has begun, but the connection is
// not complete. We need a "connection complete" event here, but
// we don't have one yet. Instead, sleep for a while -- 3 seconds
// is good enough for now.
setTimeout(doItAgain, 3000);
}
};
doItAgain();
});
it('can set desired properties while the client is disconnected', function(done) {
async.series([
function setDesiredProperties(callback) {

Просмотреть файл

@ -30,7 +30,7 @@ var registry = Registry.fromConnectionString(registryConnectionString);
var X509IndividualTransports = [ Http, Amqp, AmqpWs, Mqtt, MqttWs ];
var X509GroupTransports = [ Http, Amqp, AmqpWs, Mqtt, MqttWs ];
var X509GroupTransports = [ Http, Amqp, Mqtt, MqttWs ]; // AmqpWs is disabled because of an occasional ECONNRESET error when closing the socket. See Task 2233264.
var TpmIndividualTransports = [ Http, Amqp, AmqpWs ];
var rootCert = {