This commit is contained in:
Ian Hollier 2021-02-15 13:59:01 -08:00
Родитель 11662c0f09
Коммит c397ac764d
3 изменённых файлов: 351 добавлений и 0 удалений

153
Azure_function/index.js Normal file
Просмотреть файл

@ -0,0 +1,153 @@
/*!
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
const fetch = require('node-fetch');
const handleMessage = require('./lib/engine');
const msiEndpoint = process.env.MSI_ENDPOINT;
const msiSecret = process.env.MSI_SECRET;
const parameters = {
idScope: process.env.ID_SCOPE,
primaryKeyUrl: process.env.IOTC_KEY_URL
};
let kvToken;
module.exports = async function (context, req) {
try {
/* Add the compute or data transformation code here
* In this example we have a mini pipeline of operations:
*
* 1 - Convert the telemetry from a comma seperated values (CSV) to JSON
* 2 - Convert the temperature value from centigrade to fahrenheit
* 3 - Enrich the telemetry from the external source Open Weather Service
*
*/
// transform the data from CSV to JSON
let computedTelemetry = await convertFromCsvToJson(context, req);
// convert the temperatures from centigrade to fahrenheit
computedTelemetry.temp = await convertTemperature(context, computedTelemetry.temp);
// get external weather data
const openWeatherAppId = 'c5c93a9ef23935b7be775f03f14b5618';
computedTelemetry.weather = await getWeatherData(context, openWeatherAppId, computedTelemetry.lat, computedTelemetry.lon);
// get the deviceId from the incoming message data
let device = {};
device.deviceId = req.body.deviceId;
// enqueuedTime from the export message is used so enqueue time for computed values is
await handleMessage({ ...parameters, log: context.log, getSecret: getKeyVaultSecret }, device, computedTelemetry, req.body.enqueuedTime);
} catch (e) {
context.log('[ERROR]', e.message);
context.res = {
status: e.statusCode ? e.statusCode : 500,
body: e.message
};
}
}
/**
* Is a number helper function
*/
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}
/**
* Transform the data from CSV to JSON
*/
async function convertFromCsvToJson(context, req) {
let jsonData = {}
const columnHeaders = ['temp', 'humidity','lat', 'lon'];
const csvData = req.body.telemetry.data.split(',');
if (columnHeaders.length === csvData.length) {
for (let i = 0; i < csvData.length; i++) {
csvData[i] = csvData[i].trim();
if (isNumeric(csvData[i])) {
jsonData[columnHeaders[i]] = csvData[i] * 1;
} else {
jsonData[columnHeaders[i]] = csvData[i];
}
}
} else {
context.log('Error: The number of headers and CSV elements do not match')
}
return jsonData;
}
/**
* Convert temperature from centigrade to fahrenheit
*/
async function convertTemperature(context, tempInCentigrade) {
return ((tempInCentigrade * (9 / 5)) + 32).toFixed(2) * 1;
}
/**
* Fetch weather data from Open Weather Service
*/
async function getWeatherData(context, appid, lat, lon) {
// fetch weather data from Open Weather Service
if (lat && lon) {
const weatherUrl = `https://api.openweathermap.org/data/2.5/onecall?lat=${lat}&lon=${lon}&appid=${appid}&exclude=hourly,daily,minutely&units=imperial`;
const options = {
method: 'GET',
};
const weatherDataResponse = await fetch(weatherUrl, options).then(res => res.json());
let weather = {};
weather.weather_temp = weatherDataResponse.current.temp;
weather.weather_humidity = weatherDataResponse.current.humidity;
weather.weather_pressure = weatherDataResponse.current.pressure;
weather.weather_windspeed = weatherDataResponse.current.wind_speed;
weather.weather_clouds = weatherDataResponse.current.clouds;
weather.weather_uvi = weatherDataResponse.current.uvi;
return weather;
}
}
/**
* Fetches a Key Vault secret. Attempts to refresh the token on authorization errors.
*/
async function getKeyVaultSecret(context, secretUrl, forceTokenRefresh = false) {
if (!kvToken || forceTokenRefresh) {
const url = `${msiEndpoint}/?resource=https://vault.azure.net&api-version=2017-09-01`;
const options = {
method: 'GET',
headers: { 'Secret': msiSecret }
};
try {
context.log('[HTTP] Requesting new Key Vault token');
const response = await fetch(url, options).then(res => res.json())
kvToken = response.access_token;
} catch (e) {
context.log('fail: ' + e);
throw new Error('Unable to get Key Vault token');
}
}
url = `${secretUrl}?api-version=2016-10-01`;
var options = {
method : 'GET',
headers : { 'Authorization' : `Bearer ${kvToken}` },
};
try {
context.log('[HTTP] Requesting Key Vault secret', secretUrl);
const response = await fetch(url, options).then(res => res.json())
return response && response.value;
} catch(e) {
if (e.statusCode === 401 && !forceTokenRefresh) {
return await getKeyVaultSecret(context, secretUrl, true);
} else {
throw new Error('Unable to fetch secret');
}
}
}

Просмотреть файл

@ -0,0 +1,187 @@
/*!
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
const crypto = require('crypto');
const fetch = require('node-fetch');
const Device = require('azure-iot-device');
const DeviceTransport = require('azure-iot-device-http');
const StatusError = require('../error').StatusError;
const registrationHost = 'global.azure-devices-provisioning.net';
const registrationSasTtl = 3600; // 1 hour
const registrationApiVersion = `2019-03-31`;
const registrationStatusQueryAttempts = 10;
const registrationStatusQueryTimeout = 2000;
const minDeviceRegistrationTimeout = 60*1000; // 1 minute
const deviceCache = {};
/**
* Forwards external telemetry messages for IoT Central devices.
* @param {{ idScope: string, primaryKeyUrl: string, log: Function, getSecret: (context: Object, secretUrl: string) => string }} context
* @param {{ deviceId: string }} device
* @param {{ [field: string]: number }} measurements
*/
module.exports = async function (context, device, measurements, timestamp) {
if (device) {
if (!device.deviceId || !/^[a-zA-Z0-9-._:]*[a-zA-Z0-9-]+$/.test(device.deviceId)) {
throw new StatusError("Invalid format: deviceId must be alphanumeric and may contain '-', '.', '_', ':'. Last character must be alphanumeric or hyphen.", 400);
}
} else {
throw new StatusError('Invalid format: a device specification must be provided.', 400);
}
if (!validateMeasurements(measurements)) {
throw new StatusError('Invalid format: invalid measurement list.', 400);
}
if (timestamp && isNaN(Date.parse(timestamp))) {
throw new StatusError('Invalid format: if present, timestamp must be in ISO format (e.g., YYYY-MM-DDTHH:mm:ss.sssZ)', 400);
}
const client = Device.Client.fromConnectionString(await getDeviceConnectionString(context, device), DeviceTransport.Http);
try {
const message = new Device.Message(JSON.stringify(measurements));
message.contentEncoding = 'utf-8';
message.contentType = 'application/json';
if (timestamp) {
message.properties.add('iothub-creation-time-utc', timestamp);
}
// add a message property that we can look for in data export to not re-compute computed telemetry
message.properties.add('computed', true);
await client.open();
context.log('[HTTP] Sending telemetry for device', device.deviceId);
await client.sendEvent(message);
await client.close();
} catch (e) {
// If the device was deleted, we remove its cached connection string
if (e.name === 'DeviceNotFoundError' && deviceCache[device.deviceId]) {
delete deviceCache[device.deviceId].connectionString;
}
throw new Error(`Unable to send telemetry for device ${device.deviceId}: ${e.message}`);
}
};
/**
* @returns true if measurements object is valid, i.e., a map of field names to numbers or strings.
*/
function validateMeasurements(measurements) {
if (!measurements || typeof measurements !== 'object') {
return false;
}
return true;
}
async function getDeviceConnectionString(context, device) {
const deviceId = device.deviceId;
if (deviceCache[deviceId] && deviceCache[deviceId].connectionString) {
return deviceCache[deviceId].connectionString;
}
const connStr = `HostName=${await getDeviceHub(context, device)};DeviceId=${deviceId};SharedAccessKey=${await getDeviceKey(context, deviceId)}`;
deviceCache[deviceId].connectionString = connStr;
return connStr;
}
/**
* Registers this device with DPS, returning the IoT Hub assigned to it.
*/
async function getDeviceHub(context, device) {
const deviceId = device.deviceId;
const now = Date.now();
// A 1 minute backoff is enforced for registration attempts, to prevent unauthorized devices
// from trying to re-register too often.
if (deviceCache[deviceId] && deviceCache[deviceId].lasRegisterAttempt && (now - deviceCache[deviceId].lasRegisterAttempt) < minDeviceRegistrationTimeout) {
const backoff = Math.floor((minDeviceRegistrationTimeout - (now - deviceCache[deviceId].lasRegisterAttempt)) / 1000);
throw new StatusError(`Unable to register device ${deviceId}. Minimum registration timeout not yet exceeded. Please try again in ${backoff} seconds`, 403);
}
deviceCache[deviceId] = {
...deviceCache[deviceId],
lasRegisterAttempt: Date.now()
}
const sasToken = await getRegistrationSasToken(context, deviceId);
url = `https://${registrationHost}/${context.idScope}/registrations/${deviceId}/register?api-version=${registrationApiVersion}`;
const registrationOptions = {
method: 'PUT',
headers: { 'Content-Type': 'application/json', Authorization: sasToken },
body: JSON.stringify({ registrationId: deviceId, payload: { iotcModelId: device.modelId } })
};
try {
context.log('[HTTP] Initiating device registration');
const response = await fetch(url, registrationOptions).then(res => res.json());
if (response.status !== 'assigning' || !response.operationId) {
throw new Error('Unknown server response');
}
url = `https://${registrationHost}/${context.idScope}/registrations/${deviceId}/operations/${response.operationId}?api-version=${registrationApiVersion}`;
const statusOptions = {
method: 'GET',
headers: { Authorization: sasToken }
};
// The first registration call starts the process, we then query the registration status
// every 2 seconds, up to 10 times.
for (let i = 0; i < registrationStatusQueryAttempts; ++i) {
await new Promise(resolve => setTimeout(resolve, registrationStatusQueryTimeout));
context.log('[HTTP] Querying device registration status');
const statusResponse = await fetch(url, statusOptions).then(res => res.json());
if (statusResponse.status === 'assigning') {
continue;
} else if (statusResponse.status === 'assigned' && statusResponse.registrationState && statusResponse.registrationState.assignedHub) {
return statusResponse.registrationState.assignedHub;
} else if (statusResponse.status === 'failed' && statusResponse.registrationState && statusResponse.registrationState.errorCode === 400209) {
throw new StatusError('The device may be unassociated or blocked', 403);
} else {
throw new Error('Unknown server response');
}
}
throw new Error('Registration was not successful after maximum number of attempts');
} catch (e) {
throw new StatusError(`Unable to register device ${deviceId}: ${e.message}`, e.statusCode);
}
}
async function getRegistrationSasToken(context, deviceId) {
const uri = encodeURIComponent(`${context.idScope}/registrations/${deviceId}`);
const ttl = Math.round(Date.now() / 1000) + registrationSasTtl;
const signature = crypto.createHmac('sha256', new Buffer(await getDeviceKey(context, deviceId), 'base64'))
.update(`${uri}\n${ttl}`)
.digest('base64');
return`SharedAccessSignature sr=${uri}&sig=${encodeURIComponent(signature)}&skn=registration&se=${ttl}`;
}
/**
* Computes a derived device key using the primary key.
*/
async function getDeviceKey(context, deviceId) {
if (deviceCache[deviceId] && deviceCache[deviceId].deviceKey) {
return deviceCache[deviceId].deviceKey;
}
const key = crypto.createHmac('SHA256', Buffer.from(await context.getSecret(context, context.primaryKeyUrl), 'base64'))
.update(deviceId)
.digest()
.toString('base64');
deviceCache[deviceId].deviceKey = key;
return key;
}

11
Azure_function/package-lock.json сгенерированный Normal file
Просмотреть файл

@ -0,0 +1,11 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"node-fetch": {
"version": "2.6.1",
"resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.1.tgz",
"integrity": "sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw=="
}
}
}