* Remove testhub CLI utility from repo

* update pnpm-lock.yaml

* remove vestigal traces of testhub
This commit is contained in:
chradek 2021-04-02 11:44:19 -07:00 коммит произвёл GitHub
Родитель f3520b933c
Коммит aad8e1c1bc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 1 добавлений и 1133 удалений

Просмотреть файл

@ -75,7 +75,6 @@ dependencies:
'@rush-temp/test-utils-multi-version': file:projects/test-utils-multi-version.tgz
'@rush-temp/test-utils-perfstress': file:projects/test-utils-perfstress.tgz
'@rush-temp/test-utils-recorder': file:projects/test-utils-recorder.tgz
'@rush-temp/testhub': file:projects/testhub.tgz
lockfileVersion: 5.2
packages:
/@azure/abort-controller/1.0.4:
@ -1261,16 +1260,6 @@ packages:
dev: false
resolution:
integrity: sha512-EyvT83ezOdec7BhDaEcsklWy7RSIdi6CNe95tmOAK0yx/Lm30C9K75snT3fYayK59ApC2oyW+rcHErdG05FHJA==
/@types/yargs-parser/20.2.0:
dev: false
resolution:
integrity: sha512-37RSHht+gzzgYeobbG+KWryeAW8J33Nhr69cjTqSYymXVZEN9NbRYWoYlRtDhHKPVT1FyNKwaTPC1NynKZpzRA==
/@types/yargs/15.0.13:
dependencies:
'@types/yargs-parser': 20.2.0
dev: false
resolution:
integrity: sha512-kQ5JNTrbDv3Rp5X2n/iUu37IJBDU2gsZ5R/g1/KHOOEc5IKfUFjXT6DENPGduh08I/pamwtEq4oul7gUqKTQDQ==
/@types/yauzl/2.9.1:
dependencies:
'@types/node': 10.17.55
@ -2071,14 +2060,6 @@ packages:
dev: false
resolution:
integrity: sha512-PYeGSEmmHM6zvoef2w8TPzlrnNpXIjTipYK780YswmIP9vjxmd6Y2a3CB2Ks6/AU8NHjZugXvo8w3oWM2qnwXA==
/cliui/6.0.0:
dependencies:
string-width: 4.2.2
strip-ansi: 6.0.0
wrap-ansi: 6.2.0
dev: false
resolution:
integrity: sha512-t6wbgtoCXvAzst7QgXxJYqPt0usEfbgQdftEPbLL/cvv6HPE5VgvqCuAIDR0NgU52ds6rFwqrgakNLrHEjCbrQ==
/cliui/7.0.4:
dependencies:
string-width: 4.2.2
@ -2349,10 +2330,6 @@ packages:
node: '>0.4.0'
resolution:
integrity: sha1-YfsWzcEnSzyayq/+n8ad+HIKK2Q=
/death/1.1.0:
dev: false
resolution:
integrity: sha1-AaqcQB7dknUFFEcLgmY5DGbGcxg=
/debounce/1.2.1:
dev: false
resolution:
@ -3230,15 +3207,6 @@ packages:
node: '>=6'
resolution:
integrity: sha512-1yD6RmLI1XBfxugvORwlck6f75tYL+iR0jqwsOrOxMZyGYqUuDhJ0l4AXdO1iX/FTs9cBAMEk1gWSEx1kSbylg==
/find-up/4.1.0:
dependencies:
locate-path: 5.0.0
path-exists: 4.0.0
dev: false
engines:
node: '>=8'
resolution:
integrity: sha512-PpOwAdQ/YlXQ2vj8a3h8IipDuYRi3wceVQQGYWxNINccq40Anw7BlsEXCMbt1Zt+OLA6Fq9suIpIWD0OsnISlw==
/flat-cache/3.0.4:
dependencies:
flatted: 3.1.1
@ -4743,14 +4711,6 @@ packages:
node: '>=6'
resolution:
integrity: sha512-7AO748wWnIhNqAuaty2ZWHkQHRSNfPVIsPIfwEOWO22AmaoVrWavlOcMR5nzTLNYvp36X220/maaRsrec1G65A==
/locate-path/5.0.0:
dependencies:
p-locate: 4.1.0
dev: false
engines:
node: '>=8'
resolution:
integrity: sha512-t7hw9pI+WvuwNJXwk5zVHpyhIqzg2qTlklJOf0mVxGSbe3Fp2VieZcduNYjaLDoy6p9uGpQEGWG87WpMKlNq8g==
/lodash.flattendeep/4.4.0:
dev: false
resolution:
@ -5491,14 +5451,6 @@ packages:
node: '>=6'
resolution:
integrity: sha512-x+12w/To+4GFfgJhBEpiDcLozRJGegY+Ei7/z0tSLkMmxGZNybVMSfWj9aJn8Z5Fc7dBUNJOOVgPv2H7IwulSQ==
/p-locate/4.1.0:
dependencies:
p-limit: 2.3.0
dev: false
engines:
node: '>=8'
resolution:
integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==
/p-try/1.0.0:
dev: false
engines:
@ -5592,12 +5544,6 @@ packages:
node: '>=4'
resolution:
integrity: sha1-zg6+ql94yxiSXqfYENe1mwEP1RU=
/path-exists/4.0.0:
dev: false
engines:
node: '>=8'
resolution:
integrity: sha512-ak9Qy5Q7jYb2Wwcey5Fpvg2KoAc/ZIhLSLOSBmRmygPsGwkVVt0fZa0qrtMz+m6tJTAHfZQ8FnmB4MG4LWy7/w==
/path-is-absolute/1.0.1:
dev: false
engines:
@ -7473,16 +7419,6 @@ packages:
node: '>=6'
resolution:
integrity: sha512-QC1/iN/2/RPVJ5jYK8BGttj5z83LmSKmvbvrXPNCLZSEb32KKVDJDl/MOt2N01qU2H/FkzEa9PKto1BqDjtd7Q==
/wrap-ansi/6.2.0:
dependencies:
ansi-styles: 4.3.0
string-width: 4.2.2
strip-ansi: 6.0.0
dev: false
engines:
node: '>=8'
resolution:
integrity: sha512-r6lPcBGxZXlIcymEu7InxDMhdW0KDxpLgoFLcguasxCaJ/SOIZwINatK9KY/tf+ZrlywOKU0UDj3ATXUBfxJXA==
/wrap-ansi/7.0.0:
dependencies:
ansi-styles: 4.3.0
@ -7614,15 +7550,6 @@ packages:
dev: false
resolution:
integrity: sha512-3lbsNRf/j+A4QuSZfDRA7HRSfWrzO0YjqTJd5kjAq37Zep1CEgaYmrH9Q3GwPiB9cHyd1Y1UwggGhJGoxipbzg==
/yargs-parser/18.1.3:
dependencies:
camelcase: 5.3.1
decamelize: 1.2.0
dev: false
engines:
node: '>=6'
resolution:
integrity: sha512-o50j0JeToy/4K6OZcaQmW6lyXXKhq7csREXcDwk2omFPJEwUNOVtJKvmDr9EI1fAJZUyZcRF7kxGBWmRXudrCQ==
/yargs-parser/20.2.7:
dev: false
engines:
@ -7654,24 +7581,6 @@ packages:
dev: false
resolution:
integrity: sha512-AX3Zw5iPruN5ie6xGRIDgqkT+ZhnRlZMLMHAs8tg7nRruy2Nb+i5o9bwghAogtM08q1dpr2LVoS8KSTMYpWXUw==
/yargs/15.4.1:
dependencies:
cliui: 6.0.0
decamelize: 1.2.0
find-up: 4.1.0
get-caller-file: 2.0.5
require-directory: 2.1.1
require-main-filename: 2.0.0
set-blocking: 2.0.0
string-width: 4.2.2
which-module: 2.0.0
y18n: 4.0.1
yargs-parser: 18.1.3
dev: false
engines:
node: '>=8'
resolution:
integrity: sha512-aePbxDmcYW++PaqBsJ+HYUFwCdv4LVvdnhBy78E57PIor8/OVvhMrADFFEDh8DHDFRv/O9i3lPhsENjO7QX0+A==
/yargs/16.2.0:
dependencies:
cliui: 7.0.4
@ -8364,7 +8273,7 @@ packages:
dev: false
name: '@rush-temp/container-registry'
resolution:
integrity: sha512-LcKNOTLCGxQWdRwcSE0WkAxDqTKPqLI8DbuFfQBA00tOSpKJp6PRoXCSquxgzk9Wy6PMyk0TrNN3iP94G3u+Qg==
integrity: sha512-JhtYDaf7z6RTEdl5ryiX5e+awRFuUAz93kZlWwHyjAlM9XmwIAFJAplLk08suRJiL7xBtGiU5/g9nonNK1giww==
tarball: file:projects/container-registry.tgz
version: 0.0.0
file:projects/core-amqp.tgz:
@ -10931,28 +10840,6 @@ packages:
integrity: sha512-+z4Le1dPD7946nToxQrse/mrMXuoJDo2Cko9iUDRTFxF2+1vs9a2WEawBV2iW4KckezsAyswjjxTrWYFl5Vyyg==
tarball: file:projects/test-utils-recorder.tgz
version: 0.0.0
file:projects/testhub.tgz:
dependencies:
'@azure/event-hubs': 2.1.4
'@types/node': 8.10.66
'@types/uuid': 8.3.0
'@types/yargs': 15.0.13
async-lock: 1.2.8
death: 1.1.0
debug: 4.3.1
eslint: 7.22.0
rhea: 1.0.24
rimraf: 3.0.2
tslib: 2.1.0
typescript: 4.2.3
uuid: 8.3.2
yargs: 15.4.1
dev: false
name: '@rush-temp/testhub'
resolution:
integrity: sha512-qfEqZt22JDp6lEcpOXiMFtpxUSlNncnRkW9Oq1DCOWgLDMxBiHC1aqBA/csglDxhZ1v86t26+8SegRID9UZfrQ==
tarball: file:projects/testhub.tgz
version: 0.0.0
registry: ''
specifiers:
'@rush-temp/abort-controller': file:./projects/abort-controller.tgz
@ -11031,4 +10918,3 @@ specifiers:
'@rush-temp/test-utils-multi-version': file:./projects/test-utils-multi-version.tgz
'@rush-temp/test-utils-perfstress': file:./projects/test-utils-perfstress.tgz
'@rush-temp/test-utils-recorder': file:./projects/test-utils-recorder.tgz
'@rush-temp/testhub': file:./projects/testhub.tgz

Просмотреть файл

@ -108,10 +108,6 @@
"name": "form-recognizer",
"path": "sdk/formrecognizer/ai-form-recognizer"
},
{
"name": "testhub",
"path": "sdk/eventhub/testhub"
},
{
"name": "identity",
"path": "sdk/identity/identity"

Просмотреть файл

@ -50,7 +50,6 @@ known_presence_issues:
- ["sdk/applicationinsights/applicationinsights-query/CHANGELOG.md", "#1583"]
- ["sdk/batch/batch/CHANGELOG.md", "#1583"]
- ["sdk/eventgrid/eventgrid/CHANGELOG.md", "#1583"]
- ["sdk/eventhub/testhub/CHANGELOG.md", "#1583"]
- ["sdk/graphrbac/graph/CHANGELOG.md", "#1583"]
- ["sdk/operationalinsights/loganalytics/CHANGELOG.md", "#1583"]
- ["sdk/servicefabric/servicefabric/CHANGELOG.md", "#1583"]
@ -122,7 +121,6 @@ known_content_issues:
- ["sdk/eventhub/event-processor-host/README.md",  "#1583"]
- ["sdk/eventhub/README.md",  "#1583"]
- ["sdk/eventhub/mock-hub/README.md",  "#1583"]
- ["sdk/eventhub/testhub/README.md",  "#1583"]
- ["sdk/graphrbac/graph/README.md",  "#1583"]
- ["sdk/keyvault/README.md",  "#1583"]
- ["sdk/operationalinsights/loganalytics/README.md",  "#1583"]
@ -144,5 +142,4 @@ known_content_issues:
package_indexing_exclusion_list:
- "@azure/template"
- "azure-sp"
- "testhub"
- "azure-sdk-for-js"

Просмотреть файл

@ -497,11 +497,6 @@
"projectFolder": "sdk/eventhub/mock-hub",
"versionPolicyName": "utility"
},
{
"packageName": "testhub",
"projectFolder": "sdk/eventhub/testhub",
"versionPolicyName": "utility"
},
{
"packageName": "@azure/identity",
"projectFolder": "sdk/identity/identity",

27
sdk/eventhub/.vscode/launch.json поставляемый
Просмотреть файл

@ -1,27 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "testhub",
"program": "${workspaceFolder}/testhub/dist/testhub/cli.js",
"args": [
"send",
"-n",
"test1",
"-c",
"Endpoint= . . . ",
"-b",
"1",
"-s",
"300",
"-p",
"0"
]
}
]
}

Просмотреть файл

@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright (c) 2020 Microsoft
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

Просмотреть файл

@ -1,114 +0,0 @@
## Installing node.js on your box
You can install node.js from [here](https://nodejs.org/en/). For installing node.js on a linux machine follow the instructions over [here](https://nodejs.org/en/download/package-manager/).
## Installing this cli for testing
```bash
git clone https://github.com/Azure/azure-sdk-for-js
cd sdk/eventhub/testhub
npm install -g .
```
## Hub Testing CLI
`testhub` is a command line tool to help test Event Hub sending and receiving.
### Usage
When simply executing `testhub`, you will be greeted with a help page. `help` can be called on each command.
```bash
testhub --help
testhub [command]
Commands:
testhub receive Sends messages to an eventhub.
testhub send Sends messages to an eventhub.
Options:
--version Show version number [boolean]
-h, --help Show help [boolean]
-c, --conn-str EventHub connection string. [string]
-n, --hub Name of the EventHub. [string] [required]
-a, --address Address URI to the EventHub entity. [string]
-k, --key-name SAS key-name for the EventHub. [string]
-v, --key SAS key for the key-name. [string]
```
#### Send
- Send with a connection string.
```bash
$ testhub send -c yourConnectionString
```
- Send will by default send 1 messages of 256 bytes.
```bash
$ testhub send -a yourNamespace -n yourHub -k yourKeyName -v yourKey
```
- You can specify more messages by using `-b` and `-s`.
```bash
$ testhub send -c yourConnectionString -b 100 -s 100
```
- To cancel press `Ctrl + c`.
```bash
testhub send --help
testhub send
Sends messages to an eventhub.
Options:
--version Show version number [boolean]
-h, --help Show help [boolean]
-c, --conn-str EventHub connection string. [string]
-n, --hub Name of the EventHub. [string] [required]
-a, --address Address URI to the EventHub entity. [string]
-k, --key-name SAS key-name for the EventHub. [string]
-v, --key SAS key for the key-name. [string]
-b, --msg-group Number of events to group/batch. [number] [default: 1]
-t, --msg-count Number of events to send in one iteration.
[number] [default: 1000]
-s, --msg-size size in bytes for each event. [number] [default: 256]
-p, --partition-id The partitionId that the sender should send the event to.
[string] [default: "0"]
-w, --wait Number of seconds to sleep. [number] [default: 0]
-i, --iterations Number of iterations to repeat the process of sending
messages. For sending messages forever, provide a
value less than 1.
[number] [default: 1]
```
#### Receive
- Use a connection string to receive messages.
```bash
$ testhub receive -c yourConnectionString
```
- Listen for messages on all partitions of a given Event Hub.
```bash
$ testhub receive -a yourNamespace -n yourHub -k yourKeyName -v yourKey
```
- To cancel press `Ctrl + c`.
```bash
$ testhub receive --help
testhub receive
Receives messages from an eventhub.
Options:
--version Show version number [boolean]
-h, --help Show help [boolean]
-c, --conn-str EventHub connection string. [string]
-n, --hub Name of the EventHub. [string] [required]
-a, --address Address URI to the EventHub entity. [string]
-k, --key-name SAS key-name for the EventHub. [string]
-v, --key SAS key for the key-name. [string]
-d, --duration The value must be in seconds. Receive messages for the
specified duration. Useful for benchmark testing.
[number]
-p, --partitions Comma seperated partition IDs. [string] [default: "0"]
-g, --consumer Consumer group name [string] [default: "$default"]
-o, --offset Starting offset [string] [default: "-1"]
-f, --full-event-data Display the complete EventData object.
```
#### Debug
- If you would like to see more in-depth information about what is happening, `export DEBUG=azure*` or `export DEBUG=azure*,rhea*`.
![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-js%2Fsdk%2Feventhub%2Ftesthub%2FREADME.png)

Просмотреть файл

@ -1,85 +0,0 @@
#!/usr/bin/env node
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
/* tslint:disable */
import * as yargs from "yargs";
const CtrlC = require("death");
import { cache } from "./commands/sendReceive";
import { partitionCount, uberStartTime, startTime } from "./commands/receive";
import { getCurrentCommand } from "./utils/util";
yargs
.version("0.1.0")
.commandDir("./commands")
.strict()
.option("h", { alias: "help" })
.option("c", {
alias: "conn-str",
describe: "EventHub connection string.",
string: true
})
.option("n", {
alias: "hub",
describe: "Name of the EventHub.",
demandOption: true,
string: true
})
.option("a", {
alias: "address",
describe: "Address URI to the EventHub entity.",
string: true
})
.option("k", {
alias: "key-name",
describe: "SAS key-name for the EventHub.",
string: true
})
.option("e", {
alias: "client-pool",
describe: "Number of clients to be created",
default: 1,
number: true
})
.option("v", {
alias: "key",
describe: "SAS key for the key-name.",
string: true
})
.conflicts({
"c": ["a", "k", "v"]
})
.global(["h", "c", "n", "a", "k", "v", "l"])
.help()
.argv;
if (yargs.argv._.length === 0 && yargs.argv.h === false) {
yargs.coerce('help', function (arg) { return true; }).argv;
}
// if (!process.env.DEBUG) process.env.DEBUG = "azure*,rhea*,-rhea:raw,-rhea:message,-azure:amqp-common:datatransformer,-rhea:frame";
CtrlC((signal, err) => {
if (getCurrentCommand() === "send-receive") {
console.log("\nstats:");
console.log("---------------------");
console.log(" messageId | message ");
console.log("---------------------");
console.log("%o", cache);
console.log("---------------------");
} else if (getCurrentCommand() === "receive") {
console.log("\nstats:");
console.log("---------------------------------------------------------");
console.log(" PartitionId | Received Message Count | messages/second ");
console.log("---------------------------------------------------------");
for (const key in partitionCount) {
const count = partitionCount[key].currCount;
const duration = (partitionCount[key].currTimestamp - (startTime || uberStartTime)) / 1000;
const rate = count / duration;
console.log(` ${key} | ${count} | ${rate} `);
}
console.log("---------------------------------------------------------");
}
process.exit();
});

Просмотреть файл

@ -1,174 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { CommandBuilder } from "yargs";
import {
EventProcessorHost, OnReceivedMessage, PartitionContext, OnReceivedError
} from "@azure/event-processor-host";
import { log, setCurrentCommand } from "../utils/util";
import { EventHubClient, EventPosition, EventData, Dictionary } from "@azure/event-hubs";
export const command = "eph";
export const describe = "Starts an eph to receive messages.";
export const builder: CommandBuilder = {
s: {
alias: "storage-str",
describe: "Azure storage connection string.",
required: true,
string: true
},
p: {
alias: "host-prefix",
describe: "EPH name prefix",
string: true,
required: true
},
g: {
alias: "consumer",
describe: "Consumer group name",
default: "$default",
string: true
},
o: {
alias: "offset",
describe: "Starting offset",
default: "-1",
string: true
},
t: {
alias: "hosts",
describe: "Number of hosts",
default: 1,
number: true
},
d: {
alias: "lease-duration",
number: true,
default: 30,
describe: "Lease duration in seconds."
},
r: {
alias: "lease-renew-interval",
number: true,
default: 10,
describe: "Lease renew interval in seconds."
},
l: {
alias: "lease-container-name",
describe: "The name of the lease container",
default: "my-container",
string: true
}
};
function validateArgs(argv: any) {
if (!argv) {
throw new Error(`argv cannot be null or undefined.`);
}
if (!argv.connStr && (!argv.key || !argv.keyName || !argv.address)) {
throw new Error(`Either provide --conn-str OR (--address "sb://{yournamespace}.servicebus.windows.net" --key-name "<shared-access-key-name>" --key "<shared-access-key-value>")`);
}
if (!argv.storageStr) {
throw new Error(`-s <storage connection string> is required.`);
}
}
export async function handler(argv: any): Promise<void> {
setCurrentCommand(command);
const ephCache: Dictionary<EventProcessorHost> = {};
try {
validateArgs(argv);
const consumerGroup = argv.consumer;
const offset = argv.offset;
let connectionString: string = argv.connStr;
const storageStr: string = argv.storageStr;
const hub: string = argv.hub;
const ephCount: number = argv.hosts;
const leaseContainerName = argv.leaseContainerName;
const hostPrefix = argv.hostPrefix;
const leaseDuration = argv.leaseDuration;
const leaseRenewInterval = argv.leaseRenewInterval;
if (!connectionString) {
let address = argv.address;
if (!address.endsWith("/")) address += "/";
if (!address.startsWith("sb://")) address = "sb://" + address;
connectionString = `Endpoint=${address};SharedAccessKeyName=${argv.keyName};SharedAccessKey=${argv.key}`;
}
const client = EventHubClient.createFromConnectionString(connectionString, hub);
const ids = await client.getPartitionIds();
await client.close();
log("Total number of partitions: %d", ids.length);
log("Creating %d EPH(s).", ephCount);
for (let i = 0; i < ephCount; i++) {
const hostName = `${hostPrefix}-${i + 1}`;
ephCache[hostName] = EventProcessorHost.createFromConnectionString(hostName,
storageStr,
leaseContainerName,
connectionString,
{
leaseDuration: leaseDuration,
leaseRenewInterval: leaseRenewInterval,
eventHubPath: hub,
consumerGroup: consumerGroup,
initialOffset: offset === "-1" ? EventPosition.fromOffset("-1") : undefined,
onEphError: (error) => {
log(">>>>>>> [%s] Error: %O", hostName, error);
}
});
}
const startedEphs: Array<Promise<void>> = [];
for (let ephName of Object.keys(ephCache)) {
// Message handler
const eph = ephCache[ephName];
let count: number = 0;
const onMessage: OnReceivedMessage = async (context: PartitionContext, data: EventData) => {
count++;
// log("##### [%s] %d - Rx message from partition '%s'.", eph.hostName, count, context.partitionId);
// Checkpointing every 1000th event
if (count % 1000 === 0) {
try {
log("##### [%s] %d - Checkpointing message from partition '%s', with offset " +
"'%s', sequenceNumber %d.", eph, count, context.partitionId, data.offset,
data.sequenceNumber);
log("***** [%s] Number of partitions: %O", eph.hostName, eph.receivingFromPartitions.length);
log("***** [%s] EPH is currently receiving messages from partitions: %s, total number %d.",
eph.hostName, eph.receivingFromPartitions.toString(), eph.receivingFromPartitions.length);
await context.checkpoint();
log("$$$$ [%s] Successfully checkpointed message number %d for partition '%s'",
eph.hostName, count, context.partitionId);
} catch (err) {
log(">>>>>>> [%s] An error occurred while checkpointing msg number %d: %O",
eph.hostName, count, err);
}
}
};
// Error handler
const onError: OnReceivedError = (error) => {
log(">>>>> [%s] Received Error: %O", eph.hostName, error);
};
log(">>>>>> Starting the EPH - %s", eph.hostName);
startedEphs.push(eph.start(onMessage, onError));
}
await Promise.all(startedEphs);
setInterval(() => {
log("Performing the task every 60 seconds..");
const ephNames = Object.keys(ephCache);
log(">>> Total number of ephs: %d", ephNames.length);
for (let ephName of ephNames) {
const eph = ephCache[ephName];
const partitions = eph.receivingFromPartitions;
log("[%s] Currently receiving from partitions: %s", eph.hostName, partitions.toString());
log("[%s] Number of partitions: %d", eph.hostName, partitions.length);
}
}, 60000);
} catch (err) {
throw err;
}
}

Просмотреть файл

@ -1,155 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { CommandBuilder } from "yargs";
import { EventHubClient, EventPosition, EventData } from "@azure/event-hubs";
import { log, setCurrentCommand } from "../utils/util";
export const command = "receive";
export const describe = "Receives messages from an eventhub.";
const logFrequency = 60000;
export const builder: CommandBuilder = {
d: {
alias: "duration",
describe: "The value must be in seconds. Receive messages for the specified duration. Useful for benchmark testing.",
number: true
},
p: {
alias: "partitions",
describe: "Comma seperated partition IDs.",
default: "0",
string: true,
coerce: ((arg: any) => {
if (typeof arg === "string")
return arg.split(",").map((x) => { return x.trim() });
else
return arg;
})
},
g: {
alias: "consumer",
describe: "Consumer group name",
default: "$default",
string: true
},
o: {
alias: "offset",
describe: "Starting offset",
default: "-1",
string: true
},
f: {
alias: "full-event-data",
describe: "Display the complete EventData object.",
default: false,
boolean: true
}
};
interface CountEntry {
currCount: number;
prevCount: number;
prevTimestamp: number;
currTimestamp: number;
timer?: NodeJS.Timer;
}
export const partitionCount: { [x: string]: CountEntry } = {};
export const uberStartTime = Date.now();
export let startTime: number;
function validateArgs(argv: any) {
if (!argv) {
throw new Error(`argv cannot be null or undefined.`);
}
if (!argv.connStr && (!argv.key || !argv.keyName || !argv.address)) {
throw new Error(`Either provide --conn-str OR (--address "sb://{yournamespace}.servicebus.windows.net" --key-name "<shared-access-key-name>" --key "<shared-access-key-value>")`);
}
}
export async function handler(argv: any): Promise<void> {
setCurrentCommand(command);
try {
validateArgs(argv);
let partitionIds = argv.partitions;
const consumerGroup = argv.consumer;
const offset = argv.offset;
const duration = argv.duration;
let client: EventHubClient;
let connectionString = argv.connStr;
if (!connectionString) {
let address = argv.address;
if (!address.endsWith("/")) address += "/";
if (!address.startsWith("sb://")) address = "sb://" + address;
connectionString = `Endpoint=${address};SharedAccessKeyName=${argv.keyName};SharedAccessKey=${argv.key}`;
}
client = EventHubClient.createFromConnectionString(connectionString, argv.hub);
if (!partitionIds) {
partitionIds = await client.getPartitionIds();
}
log("PartitionIds in the eventhub '%s' are: ", argv.hub, partitionIds);
startTime = Date.now();
log("Start time for receiving messages is: %s", startTime);
if (duration) {
log(">>>>>>>>>>>> Performance benchmark mode. <<<<<<<<<<<<<<<<");
log("Will be receiving messages only from partition: '0'.");
log(`Created Receiver for partition: "0" in consumer group: "${consumerGroup}" in event hub "${argv.hub}".`);
let datas = await client.receiveBatch("0", 500000, duration, { consumerGroup: consumerGroup, eventPosition: EventPosition.fromOffset(offset, true) });
log(`Received ${datas.length} messages in ${duration} seconds @ ${Math.floor(datas.length / duration)} messages/second.`);
} else {
for (let id of partitionIds) {
log(`Created Receiver: for partition: "${id}" in consumer group: "${consumerGroup}" in event hub "${argv.hub}".`);
const initialTS = Date.now();
partitionCount[id] = { prevCount: 0, currCount: 0, timer: undefined, prevTimestamp: initialTS, currTimestamp: initialTS };
const messageRate = () => {
const prevCount = partitionCount[id].prevCount;
const currCount = partitionCount[id].currCount;
const currTimestamp = partitionCount[id].currTimestamp;
let duration = (currTimestamp - startTime) / 1000;
if (prevCount !== currCount) {
partitionCount[id].prevCount = currCount;
partitionCount[id].prevTimestamp = currTimestamp;
} else {
duration = (partitionCount[id].prevTimestamp - startTime) / 1000;
log("No new messages have been received since '%s'.", new Date(partitionCount[id].prevTimestamp).toISOString());
}
log(`Received ${currCount} messages from partition "${id}" in ${duration} seconds ` +
`@ ${Math.floor(currCount / duration)} messages/second.`);
};
const onMessage = (m: EventData) => {
const ts = Date.now();
if (m.sequenceNumber && m.sequenceNumber !== partitionCount[id].currCount) {
log(`missed a message: seq: ${m.sequenceNumber}, count: ${partitionCount[id].currCount}`);
}
partitionCount[id].currCount += 1;
partitionCount[id].currTimestamp = ts;
if (partitionCount[id].timer == undefined) {
partitionCount[id].prevTimestamp = ts;
partitionCount[id].timer = setInterval(messageRate, logFrequency);
}
if (argv.fullEventData) {
log("Corresponding EventData object: %o", m);
}
};
const onError = (err: any) => {
if (partitionCount[id].timer != undefined) {
clearInterval(partitionCount[id].timer as NodeJS.Timer);
partitionCount[id].timer = undefined;
}
log("^^^^^^^^^^ An error occured with the receiver: %o", err);
};
client.receive(id, onMessage, onError, { consumerGroup: consumerGroup, eventPosition: EventPosition.fromOffset(offset, true) });
}
}
log("Started receiving messages from offset: '%s'.", offset);
} catch (err) {
return Promise.reject(err);
}
}

Просмотреть файл

@ -1,178 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { CommandBuilder } from "yargs";
import { EventHubClient, EventData, delay } from "@azure/event-hubs";
import { log, setCurrentCommand } from "../utils/util";
export const command = "send";
export const describe = "Sends messages to an eventhub.";
export const builder: CommandBuilder = {
b: {
alias: "msg-group",
describe: "Number of events to group/batch.",
default: 1,
number: true
},
t: {
alias: "msg-count",
describe: "Number of events to send in one iteration.",
default: 1000,
number: true
},
s: {
alias: "msg-size",
describe: "size in bytes for each event.",
default: 256,
number: true
},
p: {
alias: "partition-id",
describe: "The partitionId that the sender should send the event to. It can be a " +
"specific partition '0' or an inclusive range '0-127'.",
string: true,
},
w: {
alias: "wait",
describe: "Number of seconds to sleep.",
default: 0,
number: true
},
i: {
alias: "iterations",
describe: "Number of iterations to repeat the process of sending messages. For sending " +
"messages forever, provide a value less than 1.",
number: true,
default: 1
}
};
function validateArgs(argv: any): void {
if (!argv) {
throw new Error(`argv cannot be null or undefined.`);
}
if (!argv.connStr && (!argv.key || !argv.keyName || !argv.address)) {
throw new Error(`Either provide --conn-str OR (--address "sb://{yournamespace}.servicebus.` +
`windows.net" --key-name "<shared-access-key-name>" --key "<shared-access-key-value>")`);
}
}
export async function handler(argv: any): Promise<void> {
setCurrentCommand(command);
validateArgs(argv);
let connectionString = argv.connStr;
if (!connectionString) {
let address = argv.address;
if (!address.endsWith("/")) address += "/";
if (!address.startsWith("sb://")) address = "sb://" + address;
connectionString = `Endpoint=${address};SharedAccessKeyName=${argv.keyName};SharedAccessKey=${argv.key}`;
}
const msgCount = argv.msgCount;
const msgGroup = argv.msgGroup;
const msgSize = argv.msgSize;
const partitionId: string = argv.partitionId;
const iterations = argv.iterations;
const wait = argv.wait;
const clientPool = argv.clientPool;
const iterationValue = iterations < 1 ? Infinity : iterations;
let partitionIds: number[] = [];
console.log("client pool - %d", clientPool);
console.log("partitionId - %s", partitionId);
console.log("msg count - %d", msgCount);
console.log("msg group (batch size) - %d", msgGroup);
console.log("msg size - %d", msgSize);
console.log("iterations - %s", iterationValue);
console.log("wait time between iterations - %d", wait);
let clients: EventHubClient[] = [];
for (let c = 0; c < clientPool; c++) {
if (partitionId != undefined) {
const partitionIdRange = partitionId.split("-").map((x) => Number.parseInt(x));
partitionIds = Array.from(
new Array(partitionIdRange[partitionIdRange.length - 1] - partitionIdRange[0] + 1),
(val, index) => index + partitionIdRange[0]);
log("[Client-%d] Sending messages to partitionId: ", c, partitionIds);
} else {
log("[Client-%d] Sending messages in a round robin fashion to all the partitions.", c);
}
clients.push(EventHubClient.createFromConnectionString(connectionString, argv.hub));
}
const msgBody = Buffer.from("Z".repeat(msgSize));
const obj: EventData = { body: msgBody };
let datas: EventData[] = [];
let count = 0;
if (msgGroup > 1) { // send batch
for (count = 0; count < msgGroup; count++) {
datas.push(obj);
}
}
const msgToSend: EventData | EventData[] = datas.length ? datas : obj;
const clientSendMessage = async (client: EventHubClient, index: number, partitionId?: string | number) => {
try {
for (let i = 0; i < iterationValue; i++) {
const startTime = Date.now();
for (let j = 0; j < msgCount; j++) {
try {
if (partitionId) {
log("[Client-%d] [iteration-%d] [partition-%d] message number %d.", index, i, partitionId, j + 1);
} else {
log("[Client-%d] [iteration-%d] message number %d.", index, i, j + 1);
}
await sendMessage(client, index, msgToSend, partitionId);
} catch (err) {
if (partitionId) {
log("[Client-%d] [iteration-%d] [partition-%d] message number %d not successful.", index, i, partitionId, j + 1);
} else {
log("[Client-%d] [iteration-%d] message number %d not successful.", index, i, j + 1);
}
}
}
const totalTime = (Date.now() - startTime) / 1000;
const totalMsgs = msgCount * msgGroup;
log("[Client-%d] [iteration-%d] total time in seconds: %d, number of messages sent: %d, messages sent/second: %d, size (in bytes) of each message: %d.", index, i,
totalTime, totalMsgs, totalMsgs / totalTime, msgGroup * msgBody.length);
if (wait > 0) {
if (i + 1 < iterationValue) {
log("[Client-%d] #################### Waiting for %d seconds, before starting iteration: %d ########################", index, wait, i + 1);
await delay(wait * 1000);
} else {
log("[Client-%d] #################### All iterations complete ########################", index);
}
}
}
} catch (err) {
log(err);
}
}
if (partitionIds.length) {
for (let j = 0; j < partitionIds.length; j++) {
for (let i = 0; i < clients.length; i++) {
clientSendMessage(clients[i], i, partitionIds[j]);
}
}
} else {
for (let i = 0; i < clients.length; i++) {
clientSendMessage(clients[i], i);
}
}
}
async function sendMessage(client: EventHubClient, index: number, data: EventData | EventData[], partitionId?: string | number): Promise<any> {
if (Array.isArray(data)) {
try {
return await client.sendBatch(data, partitionId);
} catch (err) {
log("[Client-%d] An error occurred while sending a batch message. ", index, err);
}
} else {
try {
return await client.send(data, partitionId);
} catch (err) {
log("[Client-%d] An error occurred while sending a message. ", index, err);
}
}
}

Просмотреть файл

@ -1,123 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { CommandBuilder } from "yargs";
import { EventHubClient, EventPosition, EventData, delay } from "@azure/event-hubs";
import { log, setCurrentCommand, randomNumberFromInterval } from "../utils/util";
import { v4 as uuid } from "uuid";
export const command = "send-receive";
export const describe = "Sends a message to an event hub partition and receives the message " +
"from that eventhub partition, thus verifying that the sent message was received successfully.";
export const builder: CommandBuilder = {
p: {
alias: "partitionId",
describe: "The partition id to send the message and receive from.",
default: "0",
string: true
},
g: {
alias: "consumer",
describe: "Consumer group name.",
default: "$default",
string: true
},
w: {
alias: "maxwait",
describe: "Max time in seconds to wait before sending the next message. " +
"A random number between 5 and the provided number",
default: 2000,
number: true
},
};
function validateArgs(argv: any) {
if (!argv) {
throw new Error(`argv cannot be null or undefined.`);
}
if (!argv.connStr && (!argv.key || !argv.keyName || !argv.address)) {
throw new Error(`Either provide --conn-str OR (--address "sb://{yournamespace}.servicebus.windows.net" --key-name "<shared-access-key-name>" --key "<shared-access-key-value>")`);
}
}
export const cache: { [x: string]: EventData } = {};
export const minWaitTime = 5;
export async function handler(argv: any): Promise<void> {
setCurrentCommand(command);
try {
validateArgs(argv);
const partitionId: string = argv.partitionId;
const consumerGroup: string = argv.consumer;
const maxWaitTime: number = argv.maxwait;
console.log("consumer group - %s", consumerGroup);
console.log("partitionId - %s", partitionId);
console.log("max wait time (seconds) - %d", maxWaitTime);
let client1: EventHubClient;
let client2: EventHubClient;
let connectionString = argv.connStr;
if (!connectionString) {
let address = argv.address;
if (!address.endsWith("/")) address += "/";
if (!address.startsWith("sb://")) address = "sb://" + address;
connectionString = `Endpoint=${address};SharedAccessKeyName=${argv.keyName};SharedAccessKey=${argv.key}`;
}
client1 = EventHubClient.createFromConnectionString(connectionString, argv.hub);
client2 = EventHubClient.createFromConnectionString(connectionString, argv.hub);
log(`Created Receiver: for partition: "${partitionId}" in consumer group: "${consumerGroup}" in event hub "${argv.hub}".`);
log(`Created Sender: for partition: "${partitionId}" in event hub "${argv.hub}".`);
const onMessage = (m: EventData) => {
const mid = m.properties!.message_id as string;
const num = m.sequenceNumber;
if (mid && !cache[mid]) {
const msg = `Error message with seq num ${num} and id '${mid}' not found in cache..`;
log(">>>> %o", new Error(msg));
} else {
log("Received message with seq num %d and id '%s' and it is present in cache.", num, mid);
delete cache[mid];
}
};
const onError = (err: any) => {
log("^^^^^^^^^^ An error occured with the receiver: %o", err);
};
const now = Date.now();
client1.receive(partitionId, onMessage, onError, { consumerGroup: consumerGroup, eventPosition: EventPosition.fromEnqueuedTime(now) });
log("Started receiving messages from enqueued time : '%s'.", new Date(now).toString());
await delay(3000);
while (true) {
const waitTime = randomNumberFromInterval(minWaitTime, maxWaitTime) * 1000;
const nextIterationAt = Date.now() + waitTime;
const messageId = uuid();
const m: EventData = {
body: "Hello World Event Hub " + new Date().toString(),
properties: {
message_id: messageId
}
};
if (cache[messageId]) {
log("Looks like message with id '%s' is already present in the cache %o", messageId, cache);
}
cache[messageId] = m;
try {
const delivery = await client2.send(m, partitionId);
log("Sent message with id '%s'. Delivery id '%d', tag '%s'.",
messageId, delivery.id, delivery.tag.toString());
} catch (err) {
log("An error occurred while sending the message with id '%s', %o", messageId, err);
}
log("Sleeping for %d seconds. Next message will be sent around %s",
waitTime / 1000, new Date(nextIterationAt).toString());
try {
await delay(waitTime);
} catch (err) {
log("An error occurred while sleeping for %d milliseconds, %o", waitTime, err);
}
}
} catch (err) {
return Promise.reject(err);
}
}

Просмотреть файл

@ -1,56 +0,0 @@
{
"name": "testhub",
"version": "0.1.0",
"license": "MIT",
"private": true,
"engine": {
"node": ">=8.0.0"
},
"bin": {
"testhub": "./dist/cli.js"
},
"scripts": {
"audit": "node ../../../common/scripts/rush-audit.js && rimraf node_modules package-lock.json && npm i --package-lock-only 2>&1 && npm audit",
"build:test": "npm run build",
"build": "tsc",
"check-format": "echo skipped",
"clean": "rimraf dist dist-esm test-dist typings *.tgz *.log",
"execute:samples": "echo skipped",
"format": "echo skipped",
"integration-test:browser": "echo skipped",
"integration-test:node": "echo skipped",
"integration-test": "npm run integration-test:node && npm run integration-test:browser",
"lint:fix": "echo skipped",
"lint": "echo skipped",
"pack": "npm pack 2>&1",
"prebuild": "npm run clean",
"prepare": "npm run build",
"test:browser": "npm run build:test && npm run unit-test:browser && npm run integration-test:browser",
"test:node": "npm run build:test && npm run unit-test:node && npm run integration-test:node",
"test": "npm run build:test && npm run unit-test && npm run integration-test",
"unit-test:browser": "echo skipped",
"unit-test:node": "echo skipped",
"unit-test": "npm run unit-test:node && npm run unit-test:browser",
"build:samples": "echo Skipped.",
"docs": "echo Skipped."
},
"dependencies": {
"@azure/event-hubs": "^2.1.4",
"@azure/event-processor-host": "^2.0.0",
"async-lock": "^1.1.3",
"death": "^1.1.0",
"debug": "^4.1.1",
"rhea": "^1.0.24",
"tslib": "^2.0.0",
"uuid": "^8.3.0",
"yargs": "^15.0.0"
},
"devDependencies": {
"@types/node": "^8.0.0",
"@types/uuid": "^8.0.0",
"@types/yargs": "^15.0.5",
"eslint": "^7.15.0",
"rimraf": "^3.0.0",
"typescript": "~4.2.0"
}
}

Просмотреть файл

@ -1,48 +0,0 @@
{
"compilerOptions": {
/* Basic Options */
"target": "es6" /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017','ES2018' or 'ESNEXT'. */,
"module": "es6" /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', or 'ESNext'. */,
"declaration": true /* Generates corresponding '.d.ts' file. */,
"declarationMap": true /* Generates a sourcemap for each corresponding '.d.ts' file. */,
"sourceMap": true /* Generates corresponding '.map' file. */,
"inlineSources": true,
"outDir": "./dist-esm" /* Redirect output structure to the directory. */,
"declarationDir": "./typings" /* Output directory for generated declaration files.*/,
"importHelpers": true /* Import emit helpers from 'tslib'. */,
/* Strict Type-Checking Options */
"alwaysStrict": true,
"strictNullChecks": true,
"strictPropertyInitialization": true,
"noImplicitReturns": true /* Report error when not all code paths in function return a value. */,
/* Additional Checks */
"noUnusedLocals": true /* Report errors on unused locals. */,
/* Module Resolution Options */
"moduleResolution": "node" /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */,
"allowSyntheticDefaultImports": true /* Allow default imports from modules with no default export. This does not affect code emit, just typechecking. */,
"esModuleInterop": true /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */,
/* Experimental Options */
"forceConsistentCasingInFileNames": true,
/* Other options */
"newLine": "LF" /* Use the specified end of line sequence to be used when emitting files: "crlf" (windows) or "lf" (unix).*/,
"allowJs": false /* Don't allow JavaScript files to be compiled.*/,
"lib": [
"es2015",
"dom",
"dom.iterable",
"es5",
"es6",
"es7",
"esnext",
"esnext.asynciterable",
"es2015.iterable"
]
},
"exclude": ["node_modules"],
"include": ["cli.ts", "./commands"]
}

Просмотреть файл

@ -1,25 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
export function log(message: string, ...args: any[]): void {
console.log(`%s ${message}`, new Date().toISOString(), ...args);
}
let currentCommand: "send" | "receive" | "send-receive" | "testhub" | "eph" = "testhub";
export function setCurrentCommand(cmd: "send" | "receive" | "send-receive" | "eph"): void {
currentCommand = cmd;
}
export function getCurrentCommand(): "send" | "receive" | "send-receive" | "testhub" | "eph" {
return currentCommand;
}
/**
* Generates a random number between the given interval
* @param {number} min Min number of the range (inclusive).
* @param {number} max Max number of the range (inclusive).
*/
export function randomNumberFromInterval(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1) + min);
}