зеркало из
1
0
Форкнуть 0

updating receiveOnMessage to receive

This commit is contained in:
Amar Zavery 2018-04-24 16:12:39 -07:00
Родитель 4212cc2add
Коммит 1a21f73188
16 изменённых файлов: 42 добавлений и 38 удалений

Просмотреть файл

@ -14,15 +14,16 @@ npm install azure-event-hubs
The simplest usage is to use the static factory method `EventHubClient.createFromConnectionString(_connection-string_, _event-hub-path_)`. Once you have a client, you can use it for:
### Sending events
- Client object methods to `client.send()` a single message that allows you to easily send messages.
- You can even batch multiple messages together using `client.sendBatch()` method.
- You can send a single event using `client.send()` method.
- You can even batch multiple events together using `client.sendBatch()` method.
### Receiving events
- You can use `await client.receive(...)` to receive a desired number of messages for a specified amount of time. Note this is a blocking call. That is it will return an array of EventData objects
once it receives the desired number of messages or the max wait time occurs (which ever happens first). This is a useful method for testing/debugging purposes.
- For production we would expect customers would simply want to receive messages and process them. Hence we have a `client.receiveOnMessage(. . .)` method on the receiver.
This message takes the `messageHandler()` and the `errorHandler()` amongst other parameters and registers them to the receiver. This method returns a `receiverHandler` that can be used to
stop receiving further events `await receiverHandler.stop()`
- You can use `await client.receive(...)` to receive desired number of events for specified amount of time. **Note this is a blocking call**.
That is it will return an array of EventData objects once it receives the desired number of events or the max wait time occurs (which ever happens first).
This is very useful when you want to know how the received events look like or for testing/debugging purposes.
- For production we would expect customers would simply want to receive events and process them. Hence we have a `client.receive(. . .)` method on the receiver.
This message takes the `messageHandler()` and the `errorHandler()` amongst other parameters and registers them to the receiver.
This method returns a `ReceiverHandler` that can be used to stop receiving further events `await receiverHandler.stop()`
## IDE ##
This sdk has been developed in [TypeScript](https://typescriptlang.org) and has good source code documentation. It is highly recommended to use [vscode](https://code.visualstudio.com) or any other IDE that provides better intellisense and exposes the full power of source code documentation.
@ -60,7 +61,7 @@ main().catch((err) => {
```
## Example 2.1 - Receive events with handlers
This mechanism is useful for receiving messages for a longer duration.
This mechanism is useful for receiving events for a longer duration.
Receive events from partition ID 1 after the current time.
```js
@ -79,7 +80,7 @@ function async main() {
console.log("Enqueued Time: ", enqueuedTime);
});
const receiveHandler = client.receiveOnMessage("1", onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
const receiveHandler = client.receive("1", onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
// To stop receiving events later on...
await receiveHandler.stop();
@ -93,14 +94,14 @@ main().catch((err) => {
## Example 2.2 - Receive specified number of events for a given time
This mechanism is useful when you want to see how the received events look like. It can also be useful for debugging purpose.
Receive events from partition ID 1 after the current time.
Receive events from partitionId `"1"` after the current time.
```js
const { EventHubClient, EventPosition } = require('azure-event-hubs');
const client = EventHubClient.createFromConnectionString(process.env["EVENTHUB_CONNECTION_STRING"], process.env["EVENTHUB_NAME"]);
function async main() {
const datas = await receiver.receiveBatch("1", 100 /*number of messages*/, 20 /*amount of time in seconds the receiver should run. Default 60 seconds.*/, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
const datas = await receiver.receiveBatch("1", 100 /*number of events*/, 20 /*amount of time in seconds the receiver should run. Default 60 seconds.*/, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
console.log("Array of EventData objects", datas);
}
@ -111,7 +112,7 @@ main().catch((err) => {
## Example 3 - Send an event with partition key.
Send an event with a given partition "key" which is then hashed to a partition ID (so all messages with the same key will go to the same ID, but load is balanced between partitions).
Send an event with a given "partition-key" which is then hashed to a partition ID (so all events with the same key will go to the same ID, but load is balanced between partitions).
```js
const { EventHubClient, EventPosition } = require('azure-event-hubs');
@ -174,4 +175,4 @@ main().catch((err) => {
```
## AMQP Dependencies ##
It depends on [rhea](https://github.com/amqp/rhea) library for managing connections, sending and receiving messages over the [AMQP](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf) protocol.
It depends on [rhea](https://github.com/amqp/rhea) library for managing connections, sending and receiving events over the [AMQP](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf) protocol.

Просмотреть файл

@ -1,3 +1,6 @@
## 2018-04-24 0.1.1
- Changing `client.receiveOnMessage()` to `client.receive()` as that is a better naming convention and is in sync with other language sdks.
## 2018-04-23 0.1.0
- Previously we were depending on [amqp10](https://npmjs.com/package/amqp10) package for the amqp protocol. Moving forward we will be depending on [rhea](https://npmjs.com/package/rhea).
- The public facing API of this library has major breaking changes from the previous version 0.0.8. Please take a look at the [Readme](./README.md) and the [examples](./examples) directory for detailed samples.

Просмотреть файл

@ -22,7 +22,7 @@ async function main(): Promise<void> {
const onError: OnError = (err: EventHubsError | Error) => {
console.log(">>>>> Error occurred: ", err);
};
client.receiveOnMessage("0", onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
client.receive("0", onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
console.log("Created Receiver for partition 0 and CG $default.");
const messageCount = 5;

Просмотреть файл

@ -18,7 +18,7 @@ async function main(): Promise<void> {
console.log("@@@@ receiver with epoch 2.");
console.log(">>>>> Error occurred for receiver with epoch 2: ", err);
};
client.receiveOnMessage("0", onMessage, onError, { epoch: 2 });
client.receive("0", onMessage, onError, { epoch: 2 });
console.log("$$$$ Waiting for 8 seconds to let receiver 1 set up and start receiving messages...");
await delay(8000);
@ -32,7 +32,7 @@ async function main(): Promise<void> {
console.log(">>>>> Error occurred for receiver with epoch 1: ", err);
};
console.log("$$$$ Will start receiving messages from receiver with epoch value 1...");
client.receiveOnMessage("0", onMessage2, onError2, { epoch: 1 });
client.receive("0", onMessage2, onError2, { epoch: 1 });
}
main().catch((err) => {

Просмотреть файл

@ -30,7 +30,7 @@ function main() {
const onError = (err) => {
console.log(">>>>> Error occurred: ", err);
};
client.receiveOnMessage("0", onMessage, onError, { eventPosition: azure_arm_event_hubs_1.EventPosition.fromEnqueuedTime(Date.now()) });
client.receive("0", onMessage, onError, { eventPosition: azure_arm_event_hubs_1.EventPosition.fromEnqueuedTime(Date.now()) });
console.log("Created Receiver for partition 0 and CG $default.");
const messageCount = 5;
let datas = [];

Просмотреть файл

@ -26,7 +26,7 @@ function main() {
console.log("@@@@ receiver with epoch 2.");
console.log(">>>>> Error occurred for receiver with epoch 2: ", err);
};
client.receiveOnMessage("0", onMessage, onError, { epoch: 2 });
client.receive("0", onMessage, onError, { epoch: 2 });
console.log("$$$$ Waiting for 8 seconds to let receiver 1 set up and start receiving messages...");
yield azure_arm_event_hubs_1.delay(8000);
const onMessage2 = (eventData) => {
@ -39,7 +39,7 @@ function main() {
console.log(">>>>> Error occurred for receiver with epoch 1: ", err);
};
console.log("$$$$ Will start receiving messages from receiver with epoch value 1...");
client.receiveOnMessage("0", onMessage2, onError2, { epoch: 1 });
client.receive("0", onMessage2, onError2, { epoch: 1 });
});
}
main().catch((err) => {

Просмотреть файл

@ -29,7 +29,7 @@ function main() {
console.log(">>>>> Error occurred: ", err);
};
//console.log(onMessage, onError);
client.receiveOnMessage(ids[i], onMessage, onError, { eventPosition: azure_arm_event_hubs_1.EventPosition.fromEnqueuedTime(Date.now()) });
client.receive(ids[i], onMessage, onError, { eventPosition: azure_arm_event_hubs_1.EventPosition.fromEnqueuedTime(Date.now()) });
// giving some time for receiver setup to complete. This will make sure that the receiver can receive the newly sent
// message from now onwards.
yield azure_arm_event_hubs_1.delay(3000);

Просмотреть файл

@ -27,7 +27,7 @@ function main() {
eventPosition: azure_arm_event_hubs_1.EventPosition.fromEnqueuedTime(Date.now()),
enableReceiverRuntimeMetric: true
};
const rcvHandler = client.receiveOnMessage("0", onMessage, onError, options);
const rcvHandler = client.receive("0", onMessage, onError, options);
console.log("rcvHandler: ", rcvHandler.name);
});
}

Просмотреть файл

@ -19,7 +19,7 @@ async function main(): Promise<void> {
console.log(">>>>> Error occurred: ", err);
};
//console.log(onMessage, onError);
client.receiveOnMessage(ids[i], onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
client.receive(ids[i], onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
// giving some time for receiver setup to complete. This will make sure that the receiver can receive the newly sent
// message from now onwards.
await delay(3000);

Просмотреть файл

@ -19,7 +19,7 @@ async function main(): Promise<void> {
eventPosition: EventPosition.fromEnqueuedTime(Date.now()),
enableReceiverRuntimeMetric: true
}
const rcvHandler = client.receiveOnMessage("0", onMessage, onError, options);
const rcvHandler = client.receive("0", onMessage, onError, options);
console.log("rcvHandler: ", rcvHandler.name);
}

Просмотреть файл

@ -168,13 +168,13 @@ export class EventHubClient {
*
* @returns {ReceiveHandler} ReceiveHandler - An object that provides a mechanism to stop receiving more messages.
*/
receiveOnMessage(partitionId: string | number, onMessage: OnMessage, onError: OnError, options?: ReceiveOptions): ReceiveHandler {
receive(partitionId: string | number, onMessage: OnMessage, onError: OnError, options?: ReceiveOptions): ReceiveHandler {
if (!partitionId || (partitionId && typeof partitionId !== "string" && typeof partitionId !== "number")) {
throw new Error("'partitionId' is a required parameter and must be of type: 'string' | 'number'.");
}
const sReceiver = StreamingReceiver.create(this._context, partitionId, options);
this._context.receivers[sReceiver.name] = sReceiver;
sReceiver.receiveOnMessage(onMessage, onError);
sReceiver.receive(onMessage, onError);
return new ReceiveHandler(sReceiver);
}

Просмотреть файл

@ -35,7 +35,7 @@ export class ReceiveHandler {
/**
* @property {ReceiverRuntimeInfo} runtimeInfo The receiver runtime info. This property will only
* be enabled when `enableReceiverRuntimeMetric` option is set to true in the
* `client.receiveOnMessage()` method.
* `client.receive()` method.
* @readonly
*/
get runtimeInfo(): ReceiverRuntimeInfo | undefined {
@ -90,7 +90,7 @@ export class StreamingReceiver extends EventHubReceiver {
* @param {OnMessage} onMessage The message handler to receive event data objects.
* @param {OnError} onError The error handler to receive an error that occurs while receivin messages.
*/
receiveOnMessage(onMessage: OnMessage, onError: OnError): void {
receive(onMessage: OnMessage, onError: OnError): void {
if (!onMessage || typeof onMessage !== "function") {
throw new Error("'onMessage' is a required parameter and must be of type 'function'.");
}

Просмотреть файл

@ -43,5 +43,5 @@ export const defaultPrefetchCount = 1000;
export const reconnectLimit = 100;
export const packageJsonInfo = {
name: "azure-event-hubs-js",
version: "0.1.0"
version: "0.1.1"
};

Просмотреть файл

@ -91,7 +91,7 @@ export async function handler(argv: any): Promise<void> {
const onError = (err: any) => {
console.log("An error occured with the receiver: %o", err);
};
client.receiveOnMessage(id, onMessage, onError, { consumerGroup: consumerGroup, eventPosition: EventPosition.fromOffset(offset, true) });
client.receive(id, onMessage, onError, { consumerGroup: consumerGroup, eventPosition: EventPosition.fromOffset(offset, true) });
}
}
} catch (err) {

Просмотреть файл

@ -152,7 +152,7 @@ describe("EventHubClient on ", function () {
debug(">>>>>>>> error occurred", error);
done(should.equal(error.name, "MessagingEntityNotFoundError"));
}
client.receiveOnMessage("0", onMessage, onError, { consumerGroup: "some-randome-name" });
client.receive("0", onMessage, onError, { consumerGroup: "some-randome-name" });
debug(">>>>>>>> attached the error handler on the receiver...");
} catch (err) {
debug(">>> Some error", err);

Просмотреть файл

@ -275,7 +275,7 @@ describe("EventHub Receiver", function () {
const onMsg = (data) => {
debug(">>>> epoch Receiver 1", data);
};
epochRcvr1 = client.receiveOnMessage(partitionId, onMsg, onError, { epoch: 1, eventPosition: EventPosition.fromEnd() });
epochRcvr1 = client.receive(partitionId, onMsg, onError, { epoch: 1, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver 1 %s", epochRcvr1.name);
setTimeout(() => {
const onError2 = (error) => {
@ -285,7 +285,7 @@ describe("EventHub Receiver", function () {
const onMsg2 = (data) => {
debug(">>>> epoch Receiver 2", data);
};
epochRcvr2 = client.receiveOnMessage(partitionId, onMsg, onError, { epoch: 2, eventPosition: EventPosition.fromEnd() });
epochRcvr2 = client.receive(partitionId, onMsg, onError, { epoch: 2, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver 2 %s", epochRcvr2.name);
}, 3000);
});
@ -301,7 +301,7 @@ describe("EventHub Receiver", function () {
const onmsg1 = (data) => {
debug(">>>> epoch Receiver ", data);
};
epochRcvr = client.receiveOnMessage(partitionId, onmsg1, onerr1, { epoch: 1, eventPosition: EventPosition.fromEnd() });
epochRcvr = client.receive(partitionId, onmsg1, onerr1, { epoch: 1, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver %s", epochRcvr.name);
const onerr2 = (error) => {
debug(">>>> non epoch Receiver", error);
@ -321,7 +321,7 @@ describe("EventHub Receiver", function () {
const onmsg2 = (data) => {
debug(">>>> non epoch Receiver", data);
};
nonEpochRcvr = client.receiveOnMessage(partitionId, onmsg2, onerr2, { eventPosition: EventPosition.fromEnd() });
nonEpochRcvr = client.receive(partitionId, onmsg2, onerr2, { eventPosition: EventPosition.fromEnd() });
debug("Created non epoch receiver %s", nonEpochRcvr.name);
});
@ -347,7 +347,7 @@ describe("EventHub Receiver", function () {
const onmsg3 = (data) => {
debug(">>>> non epoch Receiver", data);
};
nonEpochRcvr = client.receiveOnMessage(partitionId, onmsg3, onerr3, { eventPosition: EventPosition.fromEnd() });
nonEpochRcvr = client.receive(partitionId, onmsg3, onerr3, { eventPosition: EventPosition.fromEnd() });
debug("Created non epoch receiver %s", nonEpochRcvr.name);
setTimeout(() => {
const onerr4 = (error) => {
@ -357,7 +357,7 @@ describe("EventHub Receiver", function () {
const onmsg4 = (data) => {
debug(">>>> epoch Receiver ", data);
};
epochRcvr = client.receiveOnMessage(partitionId, onmsg4, onerr4, { epoch: 1, eventPosition: EventPosition.fromEnd() });
epochRcvr = client.receive(partitionId, onmsg4, onerr4, { epoch: 1, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver %s", epochRcvr.name);
}, 3000);
});
@ -499,7 +499,7 @@ describe("EventHub Receiver", function () {
debug("@@@@ Error received by receiver %s", rcvrId);
debug(err);
};
const rcvHndlr = client.receiveOnMessage(partitionId, onMsg, onError, { eventPosition: EventPosition.fromStart(), identifier: rcvrId });
const rcvHndlr = client.receive(partitionId, onMsg, onError, { eventPosition: EventPosition.fromStart(), identifier: rcvrId });
rcvHndlrs.push(rcvHndlr);
}
debug(">>> Attached message handlers to each receiver.");
@ -524,7 +524,7 @@ describe("EventHub Receiver", function () {
done();
});
}
const failedRcvHandler = client.receiveOnMessage(partitionId, onmsg2, onerr2, { eventPosition: EventPosition.fromStart(), identifier: "rcvr-6" });
const failedRcvHandler = client.receive(partitionId, onmsg2, onerr2, { eventPosition: EventPosition.fromStart(), identifier: "rcvr-6" });
rcvHndlrs.push(failedRcvHandler);
}, 5000);
});