[event-hubs] adds IoT Hub connection string conversion with websockets sample (#16589)
As requested by https://github.com/Azure/azure-sdk-for-js/pull/7060#issuecomment-887480513, this PR adds a sample for converting an IoT Hub connection string to an Event Hubs-compatible connection string using web sockets. The main difference is that websocket options need to be specified, and the port used to connect changed from 5671 to 443. /cc @wiegvlieg - let me know if this helps or if you have more questions about how this sample works.
This commit is contained in:
Родитель
5944d3ad9a
Коммит
b497c95e60
|
@ -95,6 +95,7 @@
|
|||
},
|
||||
"skip": [
|
||||
"iothubConnectionString.js",
|
||||
"iothubConnectionStringWebsockets.js",
|
||||
"useWithIotHub.js",
|
||||
"usingAadAuth.js"
|
||||
],
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT Licence.
|
||||
|
||||
/**
|
||||
* @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets.
|
||||
*/
|
||||
|
||||
/*
|
||||
* The Event Hubs connection string is then used with the EventHubConsumerClient to receive events.
|
||||
*
|
||||
* More information about the built-in messaging endpoint can be found at:
|
||||
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
|
||||
*/
|
||||
|
||||
import * as crypto from "crypto";
|
||||
import { Buffer } from "buffer";
|
||||
import { AmqpError, Connection, ReceiverEvents, parseConnectionString } from "rhea-promise";
|
||||
import rheaPromise from "rhea-promise";
|
||||
import { EventHubConsumerClient, earliestEventPosition } from "@azure/event-hubs";
|
||||
import WebSocket from "ws";
|
||||
|
||||
// Load the .env file if it exists
|
||||
import * as dotenv from "dotenv";
|
||||
dotenv.config();
|
||||
|
||||
/**
|
||||
* Type guard for AmqpError.
|
||||
* @param err - An unknown error.
|
||||
*/
|
||||
function isAmqpError(err: any): err is AmqpError {
|
||||
return rheaPromise.isAmqpError(err);
|
||||
}
|
||||
|
||||
const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || "";
|
||||
|
||||
// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
|
||||
function generateSasToken(
|
||||
resourceUri: string,
|
||||
signingKey: string,
|
||||
policyName: string,
|
||||
expiresInMins: number
|
||||
): string {
|
||||
resourceUri = encodeURIComponent(resourceUri);
|
||||
|
||||
const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
|
||||
const toSign = resourceUri + "\n" + expiresInSeconds;
|
||||
|
||||
// Use the crypto module to create the hmac.
|
||||
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
|
||||
hmac.update(toSign);
|
||||
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));
|
||||
|
||||
// Construct authorization string.
|
||||
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts an IotHub Connection string into an Event Hubs-compatible connection string.
|
||||
* @param connectionString - An IotHub connection string in the format:
|
||||
* `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
|
||||
* @returns An Event Hubs-compatible connection string in the format:
|
||||
* `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
|
||||
*/
|
||||
async function convertIotHubToEventHubsConnectionString(connectionString: string): Promise<string> {
|
||||
const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString<{
|
||||
HostName: string;
|
||||
SharedAccessKeyName: string;
|
||||
SharedAccessKey: string;
|
||||
}>(connectionString);
|
||||
|
||||
// Verify that the required info is in the connection string.
|
||||
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
|
||||
throw new Error(`Invalid IotHub connection string.`);
|
||||
}
|
||||
|
||||
//Extract the IotHub name from the hostname.
|
||||
const [iotHubName] = HostName.split(".");
|
||||
|
||||
if (!iotHubName) {
|
||||
throw new Error(`Unable to extract the IotHub name from the connection string.`);
|
||||
}
|
||||
|
||||
// Generate a token to authenticate to the service.
|
||||
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
|
||||
const token = generateSasToken(
|
||||
`${HostName}/messages/events`,
|
||||
SharedAccessKey,
|
||||
SharedAccessKeyName,
|
||||
5 // token expires in 5 minutes
|
||||
);
|
||||
|
||||
const connection = new Connection({
|
||||
transport: "tls",
|
||||
host: HostName,
|
||||
hostname: HostName,
|
||||
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
|
||||
port: 443,
|
||||
reconnect: false,
|
||||
password: token,
|
||||
webSocketOptions: {
|
||||
webSocket: WebSocket,
|
||||
protocol: ["AMQPWSB10"],
|
||||
url: `wss://${HostName}:${443}/$servicebus/websocket`
|
||||
}
|
||||
});
|
||||
await connection.open();
|
||||
|
||||
// Create the receiver that will trigger a redirect error.
|
||||
const receiver = await connection.createReceiver({
|
||||
source: { address: `amqps://${HostName}/messages/events/$management` }
|
||||
});
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
receiver.on(ReceiverEvents.receiverError, (context) => {
|
||||
const error = context.receiver && context.receiver.error;
|
||||
if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
|
||||
const hostname = error.info && error.info.hostname;
|
||||
if (!hostname) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(
|
||||
`Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`
|
||||
);
|
||||
}
|
||||
} else {
|
||||
reject(error);
|
||||
}
|
||||
connection.close().catch(() => {
|
||||
/* ignore error */
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export async function main() {
|
||||
console.log(`Running iothubConnectionString sample`);
|
||||
|
||||
const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString(
|
||||
"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"
|
||||
);
|
||||
|
||||
const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString);
|
||||
|
||||
const subscription = consumerClient.subscribe(
|
||||
{
|
||||
// The callback where you add your code to process incoming events
|
||||
processEvents: async (events, context) => {
|
||||
for (const event of events) {
|
||||
console.log(
|
||||
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
|
||||
);
|
||||
}
|
||||
},
|
||||
processError: async (err, context) => {
|
||||
console.log(`Error on partition "${context.partitionId}" : ${err}`);
|
||||
}
|
||||
},
|
||||
{ startPosition: earliestEventPosition }
|
||||
);
|
||||
|
||||
// Wait for a bit before cleaning up the sample
|
||||
setTimeout(async () => {
|
||||
await subscription.close();
|
||||
await consumerClient.close();
|
||||
console.log(`Exiting iothubConnectionString sample`);
|
||||
}, 30 * 1000);
|
||||
}
|
||||
|
||||
main().catch((error) => {
|
||||
console.error("Error running sample:", error);
|
||||
});
|
|
@ -12,14 +12,15 @@ urlFragment: event-hubs-javascript
|
|||
|
||||
These sample programs show how to use the JavaScript client libraries for Azure Event Hubs in some common scenarios.
|
||||
|
||||
| **File Name** | **Description** |
|
||||
| --------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| [sendEvents.js][sendevents] | Demonstrates how to send events to an Event Hub. |
|
||||
| [receiveEvents.js][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. |
|
||||
| [usingAadAuth.js][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. |
|
||||
| [iothubConnectionString.js][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. |
|
||||
| [useWithIotHub.js][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. |
|
||||
| [websockets.js][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. |
|
||||
| **File Name** | **Description** |
|
||||
| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
|
||||
| [sendEvents.js][sendevents] | Demonstrates how to send events to an Event Hub. |
|
||||
| [receiveEvents.js][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. |
|
||||
| [usingAadAuth.js][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. |
|
||||
| [iothubConnectionString.js][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. |
|
||||
| [iothubConnectionStringWebsockets.js][iothubconnectionstringwebsockets] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WbeSockets. |
|
||||
| [useWithIotHub.js][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. |
|
||||
| [websockets.js][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. |
|
||||
|
||||
## Prerequisites
|
||||
|
||||
|
@ -65,6 +66,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP
|
|||
[receiveevents]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/receiveEvents.js
|
||||
[usingaadauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/usingAadAuth.js
|
||||
[iothubconnectionstring]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionString.js
|
||||
[iothubconnectionstringwebsockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/iothubConnectionStringWebsockets.js
|
||||
[usewithiothub]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/useWithIotHub.js
|
||||
[websockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/javascript/websockets.js
|
||||
[apiref]: https://docs.microsoft.com/javascript/api/@azure/event-hubs
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT Licence.
|
||||
|
||||
/**
|
||||
* @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets.
|
||||
*/
|
||||
|
||||
/*
|
||||
* The Event Hubs connection string is then used with the EventHubConsumerClient to receive events.
|
||||
*
|
||||
* More information about the built-in messaging endpoint can be found at:
|
||||
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
|
||||
*/
|
||||
|
||||
const crypto = require("crypto");
|
||||
const { Buffer } = require("buffer");
|
||||
const { Connection, ReceiverEvents, parseConnectionString } = require("rhea-promise");
|
||||
const rheaPromise = require("rhea-promise");
|
||||
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
|
||||
const WebSocket = require("ws");
|
||||
|
||||
// Load the .env file if it exists
|
||||
const dotenv = require("dotenv");
|
||||
dotenv.config();
|
||||
|
||||
/**
|
||||
* Type guard for AmqpError.
|
||||
* @param err - An unknown error.
|
||||
*/
|
||||
function isAmqpError(err) {
|
||||
return rheaPromise.isAmqpError(err);
|
||||
}
|
||||
|
||||
const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || "";
|
||||
|
||||
// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
|
||||
function generateSasToken(resourceUri, signingKey, policyName, expiresInMins) {
|
||||
resourceUri = encodeURIComponent(resourceUri);
|
||||
|
||||
const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
|
||||
const toSign = resourceUri + "\n" + expiresInSeconds;
|
||||
|
||||
// Use the crypto module to create the hmac.
|
||||
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
|
||||
hmac.update(toSign);
|
||||
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));
|
||||
|
||||
// Construct authorization string.
|
||||
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts an IotHub Connection string into an Event Hubs-compatible connection string.
|
||||
* @param connectionString - An IotHub connection string in the format:
|
||||
* `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
|
||||
* @returns An Event Hubs-compatible connection string in the format:
|
||||
* `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
|
||||
*/
|
||||
async function convertIotHubToEventHubsConnectionString(connectionString) {
|
||||
const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString(
|
||||
connectionString
|
||||
);
|
||||
|
||||
// Verify that the required info is in the connection string.
|
||||
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
|
||||
throw new Error(`Invalid IotHub connection string.`);
|
||||
}
|
||||
|
||||
//Extract the IotHub name from the hostname.
|
||||
const [iotHubName] = HostName.split(".");
|
||||
|
||||
if (!iotHubName) {
|
||||
throw new Error(`Unable to extract the IotHub name from the connection string.`);
|
||||
}
|
||||
|
||||
// Generate a token to authenticate to the service.
|
||||
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
|
||||
const token = generateSasToken(
|
||||
`${HostName}/messages/events`,
|
||||
SharedAccessKey,
|
||||
SharedAccessKeyName,
|
||||
5 // token expires in 5 minutes
|
||||
);
|
||||
|
||||
const connection = new Connection({
|
||||
transport: "tls",
|
||||
host: HostName,
|
||||
hostname: HostName,
|
||||
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
|
||||
port: 443,
|
||||
reconnect: false,
|
||||
password: token,
|
||||
webSocketOptions: {
|
||||
webSocket: WebSocket,
|
||||
protocol: ["AMQPWSB10"],
|
||||
url: `wss://${HostName}:${443}/$servicebus/websocket`
|
||||
}
|
||||
});
|
||||
await connection.open();
|
||||
|
||||
// Create the receiver that will trigger a redirect error.
|
||||
const receiver = await connection.createReceiver({
|
||||
source: { address: `amqps://${HostName}/messages/events/$management` }
|
||||
});
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
receiver.on(ReceiverEvents.receiverError, (context) => {
|
||||
const error = context.receiver && context.receiver.error;
|
||||
if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
|
||||
const hostname = error.info && error.info.hostname;
|
||||
if (!hostname) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(
|
||||
`Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`
|
||||
);
|
||||
}
|
||||
} else {
|
||||
reject(error);
|
||||
}
|
||||
connection.close().catch(() => {
|
||||
/* ignore error */
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function main() {
|
||||
console.log(`Running iothubConnectionString sample`);
|
||||
|
||||
const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString(
|
||||
"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"
|
||||
);
|
||||
|
||||
const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString);
|
||||
|
||||
const subscription = consumerClient.subscribe(
|
||||
{
|
||||
// The callback where you add your code to process incoming events
|
||||
processEvents: async (events, context) => {
|
||||
for (const event of events) {
|
||||
console.log(
|
||||
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
|
||||
);
|
||||
}
|
||||
},
|
||||
processError: async (err, context) => {
|
||||
console.log(`Error on partition "${context.partitionId}" : ${err}`);
|
||||
}
|
||||
},
|
||||
{ startPosition: earliestEventPosition }
|
||||
);
|
||||
|
||||
// Wait for a bit before cleaning up the sample
|
||||
setTimeout(async () => {
|
||||
await subscription.close();
|
||||
await consumerClient.close();
|
||||
console.log(`Exiting iothubConnectionString sample`);
|
||||
}, 30 * 1000);
|
||||
}
|
||||
|
||||
main().catch((error) => {
|
||||
console.error("Error running sample:", error);
|
||||
});
|
|
@ -12,14 +12,15 @@ urlFragment: event-hubs-typescript
|
|||
|
||||
These sample programs show how to use the TypeScript client libraries for Azure Event Hubs in some common scenarios.
|
||||
|
||||
| **File Name** | **Description** |
|
||||
| --------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| [sendEvents.ts][sendevents] | Demonstrates how to send events to an Event Hub. |
|
||||
| [receiveEvents.ts][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. |
|
||||
| [usingAadAuth.ts][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. |
|
||||
| [iothubConnectionString.ts][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. |
|
||||
| [useWithIotHub.ts][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. |
|
||||
| [websockets.ts][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. |
|
||||
| **File Name** | **Description** |
|
||||
| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
|
||||
| [sendEvents.ts][sendevents] | Demonstrates how to send events to an Event Hub. |
|
||||
| [receiveEvents.ts][receiveevents] | Demonstrates how to use the EventHubConsumerClient to process events from all partitions of a consumer group in an Event Hub. |
|
||||
| [usingAadAuth.ts][usingaadauth] | Demonstrates how to instantiate EventHubsClient using AAD token credentials obtained from using service principal secrets. |
|
||||
| [iothubConnectionString.ts][iothubconnectionstring] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint. |
|
||||
| [iothubConnectionStringWebsockets.ts][iothubconnectionstringwebsockets] | Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WbeSockets. |
|
||||
| [useWithIotHub.ts][usewithiothub] | Demonstrates how to use the EventHubConsumerClient to receive messages from an IoT Hub. |
|
||||
| [websockets.ts][websockets] | Demonstrates how to connect to Azure Event Hubs over websockets to work over an HTTP proxy. |
|
||||
|
||||
## Prerequisites
|
||||
|
||||
|
@ -77,6 +78,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP
|
|||
[receiveevents]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/receiveEvents.ts
|
||||
[usingaadauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/usingAadAuth.ts
|
||||
[iothubconnectionstring]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionString.ts
|
||||
[iothubconnectionstringwebsockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/iothubConnectionStringWebsockets.ts
|
||||
[usewithiothub]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/useWithIotHub.ts
|
||||
[websockets]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/v5/typescript/src/websockets.ts
|
||||
[apiref]: https://docs.microsoft.com/javascript/api/@azure/event-hubs
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT Licence.
|
||||
|
||||
/**
|
||||
* @summary Demonstrates how to convert an IoT Hub connection string to an Event Hubs connection string that points to the built-in messaging endpoint using WebSockets.
|
||||
*/
|
||||
|
||||
/*
|
||||
* The Event Hubs connection string is then used with the EventHubConsumerClient to receive events.
|
||||
*
|
||||
* More information about the built-in messaging endpoint can be found at:
|
||||
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
|
||||
*/
|
||||
|
||||
import * as crypto from "crypto";
|
||||
import { Buffer } from "buffer";
|
||||
import { AmqpError, Connection, ReceiverEvents, parseConnectionString } from "rhea-promise";
|
||||
import rheaPromise from "rhea-promise";
|
||||
import { EventHubConsumerClient, earliestEventPosition } from "@azure/event-hubs";
|
||||
import WebSocket from "ws";
|
||||
|
||||
// Load the .env file if it exists
|
||||
import * as dotenv from "dotenv";
|
||||
dotenv.config();
|
||||
|
||||
/**
|
||||
* Type guard for AmqpError.
|
||||
* @param err - An unknown error.
|
||||
*/
|
||||
function isAmqpError(err: any): err is AmqpError {
|
||||
return rheaPromise.isAmqpError(err);
|
||||
}
|
||||
|
||||
const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || "";
|
||||
|
||||
// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
|
||||
function generateSasToken(
|
||||
resourceUri: string,
|
||||
signingKey: string,
|
||||
policyName: string,
|
||||
expiresInMins: number
|
||||
): string {
|
||||
resourceUri = encodeURIComponent(resourceUri);
|
||||
|
||||
const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
|
||||
const toSign = resourceUri + "\n" + expiresInSeconds;
|
||||
|
||||
// Use the crypto module to create the hmac.
|
||||
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
|
||||
hmac.update(toSign);
|
||||
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));
|
||||
|
||||
// Construct authorization string.
|
||||
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts an IotHub Connection string into an Event Hubs-compatible connection string.
|
||||
* @param connectionString - An IotHub connection string in the format:
|
||||
* `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
|
||||
* @returns An Event Hubs-compatible connection string in the format:
|
||||
* `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
|
||||
*/
|
||||
async function convertIotHubToEventHubsConnectionString(connectionString: string): Promise<string> {
|
||||
const { HostName, SharedAccessKeyName, SharedAccessKey } = parseConnectionString<{
|
||||
HostName: string;
|
||||
SharedAccessKeyName: string;
|
||||
SharedAccessKey: string;
|
||||
}>(connectionString);
|
||||
|
||||
// Verify that the required info is in the connection string.
|
||||
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
|
||||
throw new Error(`Invalid IotHub connection string.`);
|
||||
}
|
||||
|
||||
//Extract the IotHub name from the hostname.
|
||||
const [iotHubName] = HostName.split(".");
|
||||
|
||||
if (!iotHubName) {
|
||||
throw new Error(`Unable to extract the IotHub name from the connection string.`);
|
||||
}
|
||||
|
||||
// Generate a token to authenticate to the service.
|
||||
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
|
||||
const token = generateSasToken(
|
||||
`${HostName}/messages/events`,
|
||||
SharedAccessKey,
|
||||
SharedAccessKeyName,
|
||||
5 // token expires in 5 minutes
|
||||
);
|
||||
|
||||
const connection = new Connection({
|
||||
transport: "tls",
|
||||
host: HostName,
|
||||
hostname: HostName,
|
||||
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
|
||||
port: 443,
|
||||
reconnect: false,
|
||||
password: token,
|
||||
webSocketOptions: {
|
||||
webSocket: WebSocket,
|
||||
protocol: ["AMQPWSB10"],
|
||||
url: `wss://${HostName}:${443}/$servicebus/websocket`
|
||||
}
|
||||
});
|
||||
await connection.open();
|
||||
|
||||
// Create the receiver that will trigger a redirect error.
|
||||
const receiver = await connection.createReceiver({
|
||||
source: { address: `amqps://${HostName}/messages/events/$management` }
|
||||
});
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
receiver.on(ReceiverEvents.receiverError, (context) => {
|
||||
const error = context.receiver && context.receiver.error;
|
||||
if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
|
||||
const hostname = error.info && error.info.hostname;
|
||||
if (!hostname) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(
|
||||
`Endpoint=sb://${hostname}/;EntityPath=${iotHubName};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`
|
||||
);
|
||||
}
|
||||
} else {
|
||||
reject(error);
|
||||
}
|
||||
connection.close().catch(() => {
|
||||
/* ignore error */
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export async function main() {
|
||||
console.log(`Running iothubConnectionString sample`);
|
||||
|
||||
const eventHubsConnectionString = await convertIotHubToEventHubsConnectionString(
|
||||
"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"
|
||||
);
|
||||
|
||||
const consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString);
|
||||
|
||||
const subscription = consumerClient.subscribe(
|
||||
{
|
||||
// The callback where you add your code to process incoming events
|
||||
processEvents: async (events, context) => {
|
||||
for (const event of events) {
|
||||
console.log(
|
||||
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
|
||||
);
|
||||
}
|
||||
},
|
||||
processError: async (err, context) => {
|
||||
console.log(`Error on partition "${context.partitionId}" : ${err}`);
|
||||
}
|
||||
},
|
||||
{ startPosition: earliestEventPosition }
|
||||
);
|
||||
|
||||
// Wait for a bit before cleaning up the sample
|
||||
setTimeout(async () => {
|
||||
await subscription.close();
|
||||
await consumerClient.close();
|
||||
console.log(`Exiting iothubConnectionString sample`);
|
||||
}, 30 * 1000);
|
||||
}
|
||||
|
||||
main().catch((error) => {
|
||||
console.error("Error running sample:", error);
|
||||
});
|
Загрузка…
Ссылка в новой задаче