зеркало из
1
0
Форкнуть 0
This commit is contained in:
Amar Zavery 2018-04-21 01:20:16 -07:00
Родитель 7e5e722b87
Коммит 79040dc4b4
5 изменённых файлов: 236 добавлений и 351 удалений

Просмотреть файл

@ -7,7 +7,7 @@ import * as chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import * as debugModule from "debug";
const debug = debugModule("azure:event-hubs:client-spec");
import { EventHubReceiver, EventHubSender, EventHubClient, EventHubPartitionRuntimeInformation } from "../lib";
import { EventHubClient, EventHubPartitionRuntimeInformation } from "../lib";
import { delay } from "../lib/util/utils";
function testFalsyValues(testFn) {
@ -98,27 +98,6 @@ describe("EventHubClient on ", function () {
});
});
describe("createSender", function () {
const ids = [0, "0", "1"];
ids.forEach(function (partitionId) {
it("returns a Sender when partitionId is " + partitionId, async function () {
client = EventHubClient.createFromConnectionString(service.connectionString!, service.path);
const sender = client.createSender(partitionId);
sender.should.be.an.instanceof(EventHubSender);
should.exist(sender.name!);
sender.partitionId!.should.equal(partitionId);
});
});
});
describe("createReceiver", function () {
it("returns a Receiver", async function () {
client = EventHubClient.createFromConnectionString(service.connectionString!, service.path);
const receiver = client.createReceiver("0");
should.equal(true, receiver instanceof EventHubReceiver);
});
});
describe("non existent eventhub", function () {
it("should throw MessagingEntityNotFoundError while getting hub runtime info", async function () {
try {
@ -143,8 +122,7 @@ describe("EventHubClient on ", function () {
it("should throw MessagingEntityNotFoundError while creating a sender", async function () {
try {
client = EventHubClient.createFromConnectionString(service.connectionString!, "bad" + Math.random());
const sender = client.createSender("0");
await sender.send({ body: "Hello World" });
await client.send({ body: "Hello World" }, "0");
} catch (err) {
debug(err);
should.equal(err.name, "MessagingEntityNotFoundError");
@ -154,8 +132,7 @@ describe("EventHubClient on ", function () {
it("should throw MessagingEntityNotFoundError while creating a receiver", async function () {
try {
client = EventHubClient.createFromConnectionString(service.connectionString!, "bad" + Math.random());
const receiver = client.createReceiver("0");
await receiver.receive(10, 5);
await client.receiveBatch("0", 10, 5);
} catch (err) {
debug(err);
should.equal(err.name, "MessagingEntityNotFoundError");
@ -168,8 +145,6 @@ describe("EventHubClient on ", function () {
try {
client = EventHubClient.createFromConnectionString(service.connectionString!, service.path);
debug(">>>>>>>> client created.");
let receiver = client.createReceiver("0", { consumerGroup: "some-randome-name" });
debug(">>>>>>>> receiver created.", receiver.name!);
const onMessage = (data) => {
debug(">>>>> data: ", data);
};
@ -177,7 +152,7 @@ describe("EventHubClient on ", function () {
debug(">>>>>>>> error occurred", error);
done(should.equal(error.name, "MessagingEntityNotFoundError"));
}
receiver.start(onMessage, onError);
client.receiveOnMessage("0", onMessage, onError, { consumerGroup: "some-randome-name" });
debug(">>>>>>>> attached the error handler on the receiver...");
} catch (err) {
debug(">>> Some error", err);

Просмотреть файл

@ -26,7 +26,7 @@ const applicationProperties = {
const testMessage: AmqpMessage = {
body: testBody,
message_annotations: testAnnotations,
properties: messageProperties,
message_id: "test_id",
application_properties: applicationProperties
};
const testEventData = EventData.fromAmqpMessage(testMessage);
@ -43,7 +43,7 @@ describe("EventData", function () {
});
it("populates the properties with the message properties", function () {
testEventData.properties!.should.equal(messageProperties);
testEventData.properties.message_id!.should.equal(messageProperties.message_id);
});
it("populates the application properties with the message application properties", function () {
@ -65,7 +65,7 @@ describe("EventData", function () {
});
it("populates properties with the message properties", function () {
messageFromED.properties!.should.equal(messageProperties);
messageFromED.message_id!.should.equal(messageProperties.message_id);
});
it("populates application_properties of the message", function () {

Просмотреть файл

@ -7,14 +7,14 @@ import * as chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import * as debugModule from "debug";
const debug = debugModule("azure:event-hubs:misc-spec");
import { EventPosition, EventHubClient, EventHubReceiver, EventData, EventHubRuntimeInformation, EventHubSender } from "../lib";
import { EventPosition, EventHubClient, EventData, EventHubRuntimeInformation } from "../lib";
import { BatchingReceiver } from "../lib/batchingReceiver";
describe("Misc tests", function () {
this.timeout(600000);
this.timeout(60000);
const service = { connectionString: process.env.EVENTHUB_CONNECTION_STRING, path: process.env.EVENTHUB_NAME };
let client: EventHubClient = EventHubClient.createFromConnectionString(service.connectionString!, service.path);
let receiver: EventHubReceiver;
let sender: EventHubSender;
let breceiver: BatchingReceiver;
let hubInfo: EventHubRuntimeInformation;
before("validate environment", async function () {
should.exist(process.env.EVENTHUB_CONNECTION_STRING,
@ -28,17 +28,6 @@ describe("Misc tests", function () {
await client.close();
});
afterEach("close the sender link", async function () {
if (sender) {
await sender.close();
debug("Sender closed.");
}
if (receiver) {
await receiver.close();
debug("Receiver closed.");
}
});
it("should be able to send and receive a large message correctly", async function () {
const bodysize = 220 * 1024;
const partitionId = hubInfo.partitionIds[0];
@ -46,11 +35,12 @@ describe("Misc tests", function () {
const msgBody = Buffer.from(msgString);
const obj: EventData = { body: msgBody };
debug("Sending one message with %d bytes.", bodysize);
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
sender = client.createSender(partitionId);
await sender.send(obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
let datas = await breceiver.receive(5, 5);
datas.length.should.equal(0);
await client.send(obj, partitionId);
debug("Successfully sent the large message.");
let datas = await receiver.receive(10, 10);
datas = await breceiver.receive(5, 10);
debug("received message: ", datas);
should.exist(datas);
datas.length.should.equal(1);
@ -60,18 +50,20 @@ describe("Misc tests", function () {
it("should be able to send and receive batched messages correctly", async function () {
try {
const partitionId = hubInfo.partitionIds[0];
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
sender = client.createSender(partitionId);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
let datas = await breceiver.receive(5, 10);
datas.length.should.equal(0);
const messageCount = 5;
let d: EventData[] = [];
for (let i = 0; i < messageCount; i++) {
let obj: EventData = { body: `Hello EH ${i}` };
d.push(obj);
}
d[0].partitionKey = 'pk1234656';
await sender.sendBatch(d, 'pk1234656');
await client.sendBatch(d, partitionId);
debug("Successfully sent 5 messages batched together.");
let datas = await receiver.receive(5, 10);
datas = await breceiver.receive(5, 10);
debug("received message: ", datas);
should.exist(datas);
datas.length.should.equal(5);
@ -95,17 +87,15 @@ describe("Misc tests", function () {
function getRandomInt(max) {
return Math.floor(Math.random() * Math.floor(max));
}
sender = client.createSender();
for (let i = 0; i < msgToSendCount; i++) {
const partitionKey = getRandomInt(10);
await sender.send({ body: "Hello EventHub " + i }, partitionKey.toString());
await client.send({ body: "Hello EventHub " + i, partitionKey: partitionKey.toString() });
}
debug("Starting to receive all messages from each partition.");
let partitionMap = {};
let totalReceived = 0;
for (let id of partitionIds) {
receiver = client.createReceiver(id, { eventPosition: EventPosition.fromOffset(partitionOffsets[id]) });
let datas = await receiver.receive(50, 10);
let datas = await client.receiveBatch(id, 50, 10, { eventPosition: EventPosition.fromOffset(partitionOffsets[id]) });
debug(`Received ${datas.length} messages from partition ${id}.`);
for (let d of datas) {
debug(">>>> _raw_amqp_mesage: ", d._raw_amqp_mesage)

Просмотреть файл

@ -8,15 +8,15 @@ import * as chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import * as debugModule from "debug";
const debug = debugModule("azure:event-hubs:receiver-spec");
import { EventPosition, EventHubClient, EventHubReceiver, EventData, EventHubRuntimeInformation, EventHubSender } from "../lib";
import { delay } from "../lib/util/utils";
import { EventPosition, EventHubClient, EventData, EventHubRuntimeInformation, delay } from "../lib";
import { BatchingReceiver } from "../lib/batchingReceiver"
import { ReceiveHandler } from "../lib/streamingReceiver";
describe("EventHub Receiver", function () {
this.timeout(30000);
this.timeout(60000);
const service = { connectionString: process.env.EVENTHUB_CONNECTION_STRING, path: process.env.EVENTHUB_NAME };
let client: EventHubClient = EventHubClient.createFromConnectionString(service.connectionString!, service.path);
let receiver: EventHubReceiver;
let sender: EventHubSender;
let breceiver: BatchingReceiver;
let hubInfo: EventHubRuntimeInformation;
before("validate environment", async function () {
should.exist(process.env.EVENTHUB_CONNECTION_STRING,
@ -31,32 +31,25 @@ describe("EventHub Receiver", function () {
});
afterEach("close the sender link", async function () {
if (sender) {
await sender.close();
debug("Sender closed.");
}
if (receiver) {
await receiver.close();
debug("Receiver closed.");
if (breceiver) {
await breceiver.close();
debug("After each - Batching Receiver closed.");
}
});
describe("with EventPosition specified as", function () {
it("'from end of stream' should receive messages correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
sender = client.createSender(partitionId);
for (let i = 0; i < 10; i++) {
const ed: EventData = {
body: "Hello awesome world " + i
}
await sender.send(ed);
await client.send(ed, partitionId);
debug("sent message - " + i);
}
debug("Creating new receiver with offset EndOfStream");
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromEnd() });
debug("Establishing the receiver link...");
const d = await receiver.receive(10, 3);
d.length.should.equal(0);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnd() });
await breceiver.receive(10, 5);
// send a new message. We should only receive this new message.
const uid = uuid();
const ed: EventData = {
@ -65,25 +58,24 @@ describe("EventHub Receiver", function () {
stamp: uid
}
}
await sender.send(ed);
await client.send(ed, partitionId);
debug(">>>>>>> Sent the new message after creating the receiver. We should only receive this message.");
const datas = await receiver.receive(10, 10);
const datas = await breceiver.receive(10, 5);
debug("received messages: ", datas);
datas.length.should.equal(1);
datas[0].applicationProperties!.stamp.should.equal(uid);
debug("Next receive on this partition should not receive any messages.");
const datas2 = await receiver.receive(10, 10);
const datas2 = await breceiver.receive(10, 10);
datas2.length.should.equal(0);
});
it("'after a particular offset' should receive messages correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
const pInfo = await client.getPartitionInformation(partitionId);
sender = client.createSender(partitionId);
debug(`Creating new receiver with last enqueued offset: "${pInfo.lastEnqueuedOffset}".`);
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromOffset(pInfo.lastEnqueuedOffset) });
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(pInfo.lastEnqueuedOffset) });
debug("Establishing the receiver link...");
const d = await receiver.receive(10, 5);
const d = await breceiver.receive(10, 5);
d.length.should.equal(0);
// send a new message. We should only receive this new message.
const uid = uuid();
@ -93,20 +85,19 @@ describe("EventHub Receiver", function () {
stamp: uid
}
}
await sender.send(ed);
await client.send(ed, "0");
debug("Sent the new message after creating the receiver. We should only receive this message.");
const datas = await receiver.receive(10, 5);
const datas = await breceiver.receive(10, 10);
debug("received messages: ", datas);
datas.length.should.equal(1);
datas[0].applicationProperties!.stamp.should.equal(uid);
debug("Next receive on this partition should not receive any messages.");
const datas2 = await receiver.receive(10, 10);
const datas2 = await breceiver.receive(10, 10);
datas2.length.should.equal(0);
});
it("'after a particular offset with isInclusive true' should receive messages correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
sender = client.createSender(partitionId);
const uid = uuid();
const ed: EventData = {
body: "New message after last enqueued offset",
@ -114,7 +105,7 @@ describe("EventHub Receiver", function () {
stamp: uid
}
}
await sender.send(ed);
await client.send(ed, partitionId);
debug(`Sent message 1 with stamp: ${uid}.`);
const pInfo = await client.getPartitionInformation(partitionId);
const uid2 = uuid();
@ -124,29 +115,28 @@ describe("EventHub Receiver", function () {
stamp: uid2
}
}
await sender.send(ed2);
await client.send(ed2, partitionId);
debug(`Sent message 2 with stamp: ${uid} after getting the enqueued offset.`);
debug(`Creating new receiver with last enqueued offset: "${pInfo.lastEnqueuedOffset}".`);
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromOffset(pInfo.lastEnqueuedOffset, true) });
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(pInfo.lastEnqueuedOffset, true) });
debug("We should receive the last 2 messages.");
const datas = await receiver.receive(10, 5);
const datas = await breceiver.receive(10, 10);
debug("received messages: ", datas);
datas.length.should.equal(2);
datas[0].applicationProperties!.stamp.should.equal(uid);
datas[1].applicationProperties!.stamp.should.equal(uid2);
debug("Next receive on this partition should not receive any messages.");
const datas2 = await receiver.receive(10, 10);
const datas2 = await breceiver.receive(10, 5);
datas2.length.should.equal(0);
});
it("'from a particular enqueued time' should receive messages correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
const pInfo = await client.getPartitionInformation(partitionId);
sender = client.createSender(partitionId);
debug(`Creating new receiver with last enqueued time: "${pInfo.lastEnqueuedTimeUtc}".`);
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromEnqueuedTime(pInfo.lastEnqueuedTimeUtc) });
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(pInfo.lastEnqueuedTimeUtc) });
debug("Establishing the receiver link...");
const d = await receiver.receive(10, 3);
const d = await breceiver.receive(10, 5);
d.length.should.equal(0);
// send a new message. We should only receive this new message.
const uid = uuid();
@ -156,21 +146,20 @@ describe("EventHub Receiver", function () {
stamp: uid
}
}
await sender.send(ed);
await client.send(ed, partitionId);
debug("Sent the new message after creating the receiver. We should only receive this message.");
const datas = await receiver.receive(10, 5);
const datas = await breceiver.receive(10, 10);
debug("received messages: ", datas);
datas.length.should.equal(1);
datas[0].applicationProperties!.stamp.should.equal(uid);
debug("Next receive on this partition should not receive any messages.");
const datas2 = await receiver.receive(10, 15);
const datas2 = await breceiver.receive(10, 5)
datas2.length.should.equal(0);
});
it("'after the particular sequence number' should receive messages correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
const pInfo = await client.getPartitionInformation(partitionId);
sender = client.createSender(partitionId);
// send a new message. We should only receive this new message.
const uid = uuid();
const ed: EventData = {
@ -179,22 +168,21 @@ describe("EventHub Receiver", function () {
stamp: uid
}
}
await sender.send(ed);
await client.send(ed, partitionId);
debug("Sent the new message after getting the partition runtime information. We should only receive this message.");
debug(`Creating new receiver with last enqueued sequence number: "${pInfo.lastSequenceNumber}".`);
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromSequenceNumber(pInfo.lastSequenceNumber) });
const datas = await receiver.receive(10, 15);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromSequenceNumber(pInfo.lastSequenceNumber) });
const datas = await breceiver.receive(10, 10);
debug("received messages: ", datas);
datas.length.should.equal(1);
datas[0].applicationProperties!.stamp.should.equal(uid);
debug("Next receive on this partition should not receive any messages.");
const datas2 = await receiver.receive(10, 10);
const datas2 = await breceiver.receive(10, 5);
datas2.length.should.equal(0);
});
it("'after the particular sequence number' with isInclusive true should receive messages correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
sender = client.createSender(partitionId);
const uid = uuid();
const ed: EventData = {
body: "New message before getting the last sequence number",
@ -202,7 +190,7 @@ describe("EventHub Receiver", function () {
stamp: uid
}
}
await sender.send(ed);
await client.send(ed, partitionId);
debug(`Sent message 1 with stamp: ${uid}.`);
const pInfo = await client.getPartitionInformation(partitionId);
const uid2 = uuid();
@ -212,18 +200,18 @@ describe("EventHub Receiver", function () {
stamp: uid2
}
}
await sender.send(ed2);
await client.send(ed2, partitionId);
debug(`Sent message 2 with stamp: ${uid}.`);
debug(`Creating new receiver with last sequence number: "${pInfo.lastSequenceNumber}".`);
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromSequenceNumber(pInfo.lastSequenceNumber, true) });
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromSequenceNumber(pInfo.lastSequenceNumber, true) });
debug("We should receive the last 2 messages.");
const datas = await receiver.receive(10, 10);
const datas = await breceiver.receive(10, 10);
debug("received messages: ", datas);
datas.length.should.equal(2);
datas[0].applicationProperties!.stamp.should.equal(uid);
datas[1].applicationProperties!.stamp.should.equal(uid2);
debug("Next receive on this partition should not receive any messages.");
const datas2 = await receiver.receive(10, 10);
const datas2 = await breceiver.receive(10, 5);
datas2.length.should.equal(0);
});
});
@ -231,53 +219,50 @@ describe("EventHub Receiver", function () {
describe("in batch mode", function () {
it("should receive messages correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
receiver = client.createReceiver(partitionId);
const datas = await receiver.receive(5, 10);
const datas = await client.receiveBatch(partitionId, 5, 10);
debug("received messages: ", datas);
datas.length.should.equal(5);
});
});
describe("with receiverRuntimeMetricEnabled", function () {
it("should have ReceiverRuntimeInfo populated", async function () {
const partitionId = hubInfo.partitionIds[0];
sender = client.createSender(partitionId);
for (let i = 0; i < 10; i++) {
const ed: EventData = {
body: "Hello awesome world " + i
}
await sender.send(ed);
debug("sent message - " + i);
}
debug("Getting the partition information");
const pInfo = await client.getPartitionInformation(partitionId);
debug("paritition info: ", pInfo);
debug("Creating new receiver with offset EndOfStream");
receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromStart(), enableReceiverRuntimeMetric: true });
let datas = await receiver.receive(1, 10);
debug("receiver.runtimeInfo ", receiver.runtimeInfo);
datas.length.should.equal(1);
should.exist(receiver.runtimeInfo);
receiver.runtimeInfo!.lastEnqueuedOffset!.should.equal(pInfo.lastEnqueuedOffset);
receiver.runtimeInfo!.lastSequenceNumber!.should.equal(pInfo.lastSequenceNumber);
receiver.runtimeInfo!.lastEnqueuedTimeUtc!.getTime().should.equal(pInfo.lastEnqueuedTimeUtc.getTime());
receiver.runtimeInfo!.partitionId!.should.equal(pInfo.partitionId);
receiver.runtimeInfo!.retrievalTime!.getTime().should.be.greaterThan(Date.now() - 60000);
});
});
// describe("with receiverRuntimeMetricEnabled", function () {
// it("should have ReceiverRuntimeInfo populated", async function () {
// const partitionId = hubInfo.partitionIds[0];
// sender = client.createSender(partitionId);
// for (let i = 0; i < 10; i++) {
// const ed: EventData = {
// body: "Hello awesome world " + i
// }
// await sender.send(ed);
// debug("sent message - " + i);
// }
// debug("Getting the partition information");
// const pInfo = await client.getPartitionInformation(partitionId);
// debug("partition info: ", pInfo);
// debug("Creating new receiver with offset EndOfStream");
// receiver = client.createReceiver(partitionId, { eventPosition: EventPosition.fromStart(), enableReceiverRuntimeMetric: true });
// let datas = await receiver.receive(1, 10);
// debug("receiver.runtimeInfo ", receiver.runtimeInfo);
// datas.length.should.equal(1);
// should.exist(receiver.runtimeInfo);
// receiver.runtimeInfo!.lastEnqueuedOffset!.should.equal(pInfo.lastEnqueuedOffset);
// receiver.runtimeInfo!.lastSequenceNumber!.should.equal(pInfo.lastSequenceNumber);
// receiver.runtimeInfo!.lastEnqueuedTimeUtc!.getTime().should.equal(pInfo.lastEnqueuedTimeUtc.getTime());
// receiver.runtimeInfo!.partitionId!.should.equal(pInfo.partitionId);
// receiver.runtimeInfo!.retrievalTime!.getTime().should.be.greaterThan(Date.now() - 60000);
// });
// });
describe("with epoch", function () {
it("should behave correctly when 2 epoch receivers with different values are connecting to a partition in a consumer group", function (done) {
const partitionId = hubInfo.partitionIds[0];
let epochRcvr1: EventHubReceiver, epochRcvr2: EventHubReceiver;
let events: EventData[] = [];
epochRcvr1 = client.createReceiver(partitionId, { epoch: 1, eventPosition: EventPosition.fromEnd() });
let epochRcvr1: ReceiveHandler, epochRcvr2: ReceiveHandler
const onError = (error) => {
debug(">>>> epoch Receiver 1", error);
should.exist(error);
should.equal(error.name, "ReceiverDisconnectedError");
epochRcvr1.close()
.then(() => epochRcvr2.close())
epochRcvr1.stop()
.then(() => epochRcvr2.stop())
.then(() => {
debug("Successfully closed the epoch receivers 1 and 2.");
done();
@ -290,10 +275,9 @@ describe("EventHub Receiver", function () {
const onMsg = (data) => {
debug(">>>> epoch Receiver 1", data);
};
epochRcvr1.start(onMsg, onError);
epochRcvr1 = client.receiveOnMessage(partitionId, onMsg, onError, { epoch: 1, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver 1 %s", epochRcvr1.name);
setTimeout(() => {
epochRcvr2 = client.createReceiver(partitionId, { epoch: 2, eventPosition: EventPosition.fromEnd() });
const onError2 = (error) => {
debug(">>>> epoch Receiver 2", error);
throw new Error("An Error should not have happened for epoch receiver with epoch value 2.");
@ -301,16 +285,15 @@ describe("EventHub Receiver", function () {
const onMsg2 = (data) => {
debug(">>>> epoch Receiver 2", data);
};
epochRcvr2.start(onMsg, onError);
epochRcvr2 = client.receiveOnMessage(partitionId, onMsg, onError, { epoch: 2, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver 2 %s", epochRcvr2.name);
}, 3000);
});
it("should behave correctly when a non epoch receiver is created after an epoch receiver", function (done) {
const partitionId = hubInfo.partitionIds[0];
let epochRcvr: EventHubReceiver, nonEpochRcvr: EventHubReceiver;
let epochRcvr: ReceiveHandler, nonEpochRcvr: ReceiveHandler;
let events: EventData[] = [];
epochRcvr = client.createReceiver(partitionId, { epoch: 1, eventPosition: EventPosition.fromEnd() });
const onerr1 = (error) => {
debug(">>>> epoch Receiver ", error);
throw new Error("An Error should not have happened for epoch receiver with epoch value 1.");
@ -318,15 +301,14 @@ describe("EventHub Receiver", function () {
const onmsg1 = (data) => {
debug(">>>> epoch Receiver ", data);
};
epochRcvr.start(onmsg1, onerr1);
epochRcvr = client.receiveOnMessage(partitionId, onmsg1, onerr1, { epoch: 1, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver %s", epochRcvr.name);
nonEpochRcvr = client.createReceiver(partitionId, { eventPosition: EventPosition.fromEnd() });
const onerr2 = (error) => {
debug(">>>> non epoch Receiver", error);
should.exist(error);
should.equal(error.name, "ReceiverDisconnectedError");
nonEpochRcvr.close()
.then(() => epochRcvr.close())
nonEpochRcvr.stop()
.then(() => epochRcvr.stop())
.then(() => {
debug("Successfully closed the nonEpoch and epoch receivers");
done();
@ -339,21 +321,20 @@ describe("EventHub Receiver", function () {
const onmsg2 = (data) => {
debug(">>>> non epoch Receiver", data);
};
nonEpochRcvr.start(onmsg2, onerr2);
nonEpochRcvr = client.receiveOnMessage(partitionId, onmsg2, onerr2, { eventPosition: EventPosition.fromEnd() });
debug("Created non epoch receiver %s", nonEpochRcvr.name);
});
it("should behave correctly when an epoch receiver is created after a non epoch receiver", function (done) {
const partitionId = hubInfo.partitionIds[0];
let epochRcvr: EventHubReceiver, nonEpochRcvr: EventHubReceiver;
let epochRcvr: ReceiveHandler, nonEpochRcvr: ReceiveHandler;
let events: EventData[] = [];
nonEpochRcvr = client.createReceiver(partitionId, { eventPosition: EventPosition.fromEnd() });
const onerr3 = (error) => {
debug(">>>> non epoch Receiver", error);
should.exist(error);
should.equal(error.name, "ReceiverDisconnectedError");
nonEpochRcvr.close()
.then(() => epochRcvr.close())
nonEpochRcvr.stop()
.then(() => epochRcvr.stop())
.then(() => {
debug("Successfully closed the nonEpoch and epoch receivers");
done();
@ -366,10 +347,9 @@ describe("EventHub Receiver", function () {
const onmsg3 = (data) => {
debug(">>>> non epoch Receiver", data);
};
nonEpochRcvr.start(onmsg3, onerr3);
nonEpochRcvr = client.receiveOnMessage(partitionId, onmsg3, onerr3, { eventPosition: EventPosition.fromEnd() });
debug("Created non epoch receiver %s", nonEpochRcvr.name);
setTimeout(() => {
epochRcvr = client.createReceiver(partitionId, { epoch: 1, eventPosition: EventPosition.fromEnd() });
const onerr4 = (error) => {
debug(">>>> epoch Receiver ", error);
throw new Error("OnErr4 >> An Error should not have happened for epoch receiver with epoch value 1.");
@ -377,7 +357,7 @@ describe("EventHub Receiver", function () {
const onmsg4 = (data) => {
debug(">>>> epoch Receiver ", data);
};
epochRcvr.start(onmsg4, onerr4);
epochRcvr = client.receiveOnMessage(partitionId, onmsg4, onerr4, { epoch: 1, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver %s", epochRcvr.name);
}, 3000);
});
@ -390,12 +370,11 @@ describe("EventHub Receiver", function () {
invalidIds.forEach(function (id) {
it(`"${id}" should throw an error`, async function () {
try {
receiver = client.createReceiver(id);
debug("Created receiver and will be receiving messages from partition id ...", id);
const d = await receiver.receive(10, 3);
const d = await client.receiveBatch(id, 10, 3);
debug("received messages ", d.length);
} catch (err) {
debug("Receiver %s received an error", receiver.name, err);
debug("Receiver received an error", err);
should.exist(err);
should.equal(true, err.name === "ArgumentOutOfRangeError" || err.name === "InvalidOperationError");
}
@ -406,7 +385,7 @@ describe("EventHub Receiver", function () {
invalidIds2.forEach(function (id) {
it(`"${id}" should throw an error`, async function () {
try {
receiver = client.createReceiver(id);
const d = await client.receiveBatch(id, 10, 3);
} catch (err) {
debug(`>>>> Received error - `, err);
should.exist(err);
@ -415,129 +394,127 @@ describe("EventHub Receiver", function () {
});
});
it("should throw 'MessagingEntityNotFoundError' if a message is received after the receiver is closed.", async function () {
receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
receiver.should.be.instanceof(EventHubReceiver);
await receiver.receive(10, 3);
await receiver.close();
debug("closed receiver.");
try {
await receiver.receive(10, 3);
} catch (err) {
should.exist(err);
should.equal(err.name, "MessagingEntityNotFoundError");
}
});
// it("should throw 'MessagingEntityNotFoundError' if a message is received after the receiver is closed.", async function () {
// receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
// receiver.should.be.instanceof(EventHubReceiver);
// await receiver.receive(10, 3);
// await receiver.close();
// debug("closed receiver.");
// try {
// await receiver.receive(10, 3);
// } catch (err) {
// should.exist(err);
// should.equal(err.name, "MessagingEntityNotFoundError");
// }
// });
it("should throw 'InvalidOperationError' if the receiver has already started receiving messages and someone calls start again.", function (done) {
receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
receiver.should.be.instanceof(EventHubReceiver);
const onErr = (err) => {
debug("An error occurred while receiving messages from the EventHub.");
throw err;
};
const onMsg = (data) => {
};
receiver.start(onMsg, onErr);
try {
receiver.start(onMsg, onErr);
} catch (err) {
// debug(">>>> Eexpected error: ", err);
should.exist(err);
should.equal(err.name, "InvalidOperationError");
done();
}
});
// it("should throw 'InvalidOperationError' if the receiver has already started receiving messages and someone calls start again.", function (done) {
// receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
// receiver.should.be.instanceof(EventHubReceiver);
// const onErr = (err) => {
// debug("An error occurred while receiving messages from the EventHub.");
// throw err;
// };
// const onMsg = (data) => {
// };
// receiver.start(onMsg, onErr);
// try {
// receiver.start(onMsg, onErr);
// } catch (err) {
// // debug(">>>> Eexpected error: ", err);
// should.exist(err);
// should.equal(err.name, "InvalidOperationError");
// done();
// }
// });
it("should throw 'InvalidOperationError' if receiver.receive() is called after receiver.start().", async function () {
receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
receiver.should.be.instanceof(EventHubReceiver);
const onErr = (err) => {
debug("An error occurred while receiving messages from the EventHub.");
throw err;
};
const onMsg = (data) => {
};
receiver.start(onMsg, onErr);
try {
await receiver.receive(10, 3);
} catch (err) {
// debug(">>>> Eexpected error: ", err);
should.exist(err);
should.equal(err.name, "InvalidOperationError");
}
});
// it("should throw 'InvalidOperationError' if receiver.receive() is called after receiver.start().", async function () {
// receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
// receiver.should.be.instanceof(EventHubReceiver);
// const onErr = (err) => {
// debug("An error occurred while receiving messages from the EventHub.");
// throw err;
// };
// const onMsg = (data) => {
// };
// receiver.start(onMsg, onErr);
// try {
// await receiver.receive(10, 3);
// } catch (err) {
// // debug(">>>> Eexpected error: ", err);
// should.exist(err);
// should.equal(err.name, "InvalidOperationError");
// }
// });
it("should throw 'InvalidOperationError' if receiver.start() is called while receiver.receive() is executing.", async function () {
receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
receiver.should.be.instanceof(EventHubReceiver);
const onErr = (err) => {
debug("An error occurred while receiving messages from the EventHub.");
throw err;
};
const onMsg = (data) => {
};
try {
receiver.receive(10, 3);
receiver.start(onMsg, onErr);
} catch (err) {
debug(">>>> Eexpected error: ", err);
should.exist(err);
should.equal(err.name, "InvalidOperationError");
}
});
// it("should throw 'InvalidOperationError' if receiver.start() is called while receiver.receive() is executing.", async function () {
// receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
// receiver.should.be.instanceof(EventHubReceiver);
// const onErr = (err) => {
// debug("An error occurred while receiving messages from the EventHub.");
// throw err;
// };
// const onMsg = (data) => {
// };
// try {
// receiver.receive(10, 3);
// receiver.start(onMsg, onErr);
// } catch (err) {
// debug(">>>> Eexpected error: ", err);
// should.exist(err);
// should.equal(err.name, "InvalidOperationError");
// }
// });
it("should throw 'InvalidOperationError' if receiver.receive() is called while previous receiver.receive() is executing.", async function () {
receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
receiver.should.be.instanceof(EventHubReceiver);
const onErr = (err) => {
debug("An error occurred while receiving messages from the EventHub.");
throw err;
};
const onMsg = (data) => {
};
try {
receiver.receive(10, 3);
receiver.receive(5, 5);
} catch (err) {
debug(">>>> Eexpected error: ", err);
should.exist(err);
should.equal(err.name, "InvalidOperationError");
}
});
// it("should throw 'InvalidOperationError' if receiver.receive() is called while previous receiver.receive() is executing.", async function () {
// receiver = client.createReceiver("0", { eventPosition: EventPosition.fromEnd() });
// receiver.should.be.instanceof(EventHubReceiver);
// const onErr = (err) => {
// debug("An error occurred while receiving messages from the EventHub.");
// throw err;
// };
// const onMsg = (data) => {
// };
// try {
// receiver.receive(10, 3);
// receiver.receive(5, 5);
// } catch (err) {
// debug(">>>> Eexpected error: ", err);
// should.exist(err);
// should.equal(err.name, "InvalidOperationError");
// }
// });
it("should receive 'QuotaExceededError' when attempting to connect more than 5 receivers to a partition in a consumer group", function (done) {
const partitionId = hubInfo.partitionIds[0];
let rcvrs: EventHubReceiver[] = [];
let rcvHndlrs: ReceiveHandler[] = [];
debug(">>> Receivers length: ", rcvHndlrs.length);
for (let i = 1; i <= 5; i++) {
rcvrs.push(client.createReceiver(partitionId, { eventPosition: EventPosition.fromStart(), identifier: `rcvr-${i}` }));
}
debug(">>> Receivers length: ", rcvrs.length);
for (const rcvr of rcvrs) {
debug("[%s], %s", rcvr.identifier, rcvr.name);
const rcvrId = `rcvr-${i}`;
debug(rcvrId);
const onMsg = (data) => {
//debug("receiver %s, %o", rcvr.identifier!, data);
//debug("receiver %s, %o", rcvrId, data);
};
const onError = (err) => {
debug("@@@@ Error received by receiver %s", rcvr.identifier!);
debug("@@@@ Error received by receiver %s", rcvrId);
debug(err);
};
rcvr.start(onMsg, onError);
const rcvHndlr = client.receiveOnMessage(partitionId, onMsg, onError, { eventPosition: EventPosition.fromStart(), identifier: rcvrId });
rcvHndlrs.push(rcvHndlr);
}
debug(">>> Attached message handlers to each receiver.");
setTimeout(() => {
const failedRcvr = client.createReceiver(partitionId, { eventPosition: EventPosition.fromStart(), identifier: "rcvr-6" });
debug(`Created 6th receiver - ${failedRcvr.name}`);
debug(`Created 6th receiver - "rcvr-6"`);
const onmsg2 = (data) => {
//debug(data);
};
const onerr2 = (err) => {
debug("@@@@ Error received by receiver %s", failedRcvr.identifier!);
debug("@@@@ Error received by receiver rcvr-6");
debug(err);
should.equal(err.name, "QuotaExceededError");
let promises = [];
for (const rcvr of rcvrs) {
promises.push(rcvr.close());
for (const rcvr of rcvHndlrs) {
promises.push(rcvr.stop());
}
Promise.all(promises).then(() => {
debug("Successfully closed all the receivers..");
@ -547,8 +524,8 @@ describe("EventHub Receiver", function () {
done();
});
}
failedRcvr.start(onmsg2, onerr2);
rcvrs.push(failedRcvr);
const failedRcvHandler = client.receiveOnMessage(partitionId, onmsg2, onerr2, { eventPosition: EventPosition.fromStart(), identifier: "rcvr-6" });
rcvHndlrs.push(failedRcvHandler);
}, 5000);
});
});

Просмотреть файл

@ -7,13 +7,11 @@ import * as chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import * as debugModule from "debug";
const debug = debugModule("azure:event-hubs:sender-spec");
import { EventHubClient, EventHubSender, EventData } from "../lib";
import { delay } from "../lib/util/utils";
import { EventHubClient, EventData, delay } from "../lib";
describe("EventHub Sender", function () {
this.timeout(6000);
const service = { connectionString: process.env.EVENTHUB_CONNECTION_STRING, path: process.env.EVENTHUB_NAME };
let client: EventHubClient = EventHubClient.createFromConnectionString(service.connectionString!, service.path);
let sender: EventHubSender;
before("validate environment", function () {
should.exist(process.env.EVENTHUB_CONNECTION_STRING,
"define EVENTHUB_CONNECTION_STRING in your environment before running integration tests.");
@ -26,80 +24,42 @@ describe("EventHub Sender", function () {
await client.close();
});
afterEach("close the sender link", async function () {
if (sender) {
debug("Closing the sender..");
await sender.close();
}
});
describe("Single message", function () {
it("should be sent successfully.", async function () {
sender = client.createSender();
sender.should.be.instanceof(EventHubSender);
let data: EventData = {
body: "Hello World"
}
const delivery = await sender.send(data);
const delivery = await client.send(data);
// debug(delivery);
delivery.id.should.equal(0);
delivery.format.should.equal(0);
delivery.settled.should.equal(true);
delivery.remote_settled.should.equal(true);
delivery.tag.toString().should.equal("0");
});
it("with partition key should be sent successfully.", async function () {
sender = client.createSender();
sender.should.be.instanceof(EventHubSender);
let data: EventData = {
body: "Hello World with partition key"
body: "Hello World with partition key",
partitionKey: "p1234"
}
const delivery = await sender.send(data, "p1234");
const delivery = await client.send(data);
// debug(delivery);
delivery.id.should.equal(0);
delivery.format.should.equal(0);
delivery.settled.should.equal(true);
delivery.remote_settled.should.equal(true);
delivery.tag.toString().should.equal("0");
});
it("should be sent successfully to a specific partition.", async function () {
sender = client.createSender("0");
sender.should.be.instanceof(EventHubSender);
let data: EventData = {
body: "Hello World"
}
const delivery = await sender.send(data);
const delivery = await client.send(data, "0");
// debug(delivery);
delivery.id.should.equal(0);
delivery.format.should.equal(0);
delivery.settled.should.equal(true);
delivery.remote_settled.should.equal(true);
delivery.tag.toString().should.equal("0");
});
it("should be sent successfully and a new amqp sender link should be created while sending a message again after it is closed.", async function () {
sender = client.createSender();
sender.should.be.instanceof(EventHubSender);
let data: EventData = {
body: "Hello World"
}
await sender.send(data);
debug("message sent successfully...");
should.exist((sender as any)._context.senders[sender.name]);
await sender.close();
debug("Closed sender...");
should.not.exist((sender as any)._context.senders[sender.name]);
data.body = "Hello World12";
await delay(2000);
await sender.send(data);
debug("Sent the message successfully again after closing previously...");
should.exist((sender as any)._context.senders[sender.name]);
await sender.close();
});
});
describe("Batch message", function () {
it("should be sent successfully.", async function () {
sender = client.createSender();
sender.should.be.instanceof(EventHubSender);
let data: EventData[] = [
{
body: "Hello World 1"
@ -108,36 +68,29 @@ describe("EventHub Sender", function () {
body: "Hello World 2"
}
];
const delivery = await sender.sendBatch(data);
debug(delivery);
delivery.id.should.equal(0);
const delivery = await client.sendBatch(data);
// debug(delivery);
delivery.format.should.equal(0x80013700);
delivery.settled.should.equal(true);
delivery.remote_settled.should.equal(true);
delivery.tag.toString().should.equal("0");
});
it("with partition key should be sent successfully.", async function () {
sender = client.createSender();
sender.should.be.instanceof(EventHubSender);
let data: EventData[] = [
{
body: "Hello World 1"
body: "Hello World 1",
partitionKey: "p1234"
},
{
body: "Hello World 2"
}
];
const delivery = await sender.sendBatch(data, "p1234");
debug(delivery);
delivery.id.should.equal(0);
const delivery = await client.sendBatch(data);
// debug(delivery);
delivery.format.should.equal(0x80013700);
delivery.settled.should.equal(true);
delivery.remote_settled.should.equal(true);
delivery.tag.toString().should.equal("0");
});
it("should be sent successfully to a specific partition.", async function () {
sender = client.createSender("0");
sender.should.be.instanceof(EventHubSender);
let data: EventData[] = [
{
body: "Hello World 1"
@ -146,81 +99,72 @@ describe("EventHub Sender", function () {
body: "Hello World 2"
}
];
const delivery = await sender.sendBatch(data);
debug(delivery);
delivery.id.should.equal(0);
const delivery = await client.sendBatch(data, "0");
// debug(delivery);
delivery.format.should.equal(0x80013700);
delivery.settled.should.equal(true);
delivery.remote_settled.should.equal(true);
delivery.tag.toString().should.equal("0");
});
});
describe("Multiple messages", function () {
it("should be sent successfully in parallel", async function () {
sender = client.createSender();
let promises = [];
for (let i = 0; i < 5; i++) {
promises.push(sender.send({ body: `Hello World ${i}` }));
promises.push(client.send({ body: `Hello World ${i}` }));
}
const result = await Promise.all(promises);
for (let i = 0; i < result.length; i++) {
const delivery = result[i];
// debug("delivery %d: %O", i, delivery);
delivery.id.should.equal(0);
delivery.format.should.equal(0);
delivery.settled.should.equal(true);
delivery.remote_settled.should.equal(true);
delivery.tag.toString().should.equal(`0`);
}
});
it("should be sent successfully in parallel by multiple senders", async function () {
let senders: EventHubSender[] = [];
const senderCount = 3;
try {
for (let i = 0; i < senderCount; i++) {
senders.push(client.createSender());
}
let promises = [];
for (let i = 0; i < senderCount; i++) {
debug(">>>>> Sending a message with sender %d", i);
promises.push(senders[i].send({ body: `Hello World ${i}` }));
if (i == 0) {
debug(">>>>> Sending a message to partition %d", i);
promises.push(client.send({ body: `Hello World ${i}` }, i));
} else if (i == 1) {
debug(">>>>> Sending a message to partition %d", i);
promises.push(client.send({ body: `Hello World ${i}` }, i));
} else {
debug(">>>>> Sending a message to the hub when i == %d", i);
promises.push(client.send({ body: `Hello World ${i}` }));
}
}
const result = await Promise.all(promises);
for (let i = 0; i < result.length; i++) {
const delivery = result[i];
// debug("delivery %d: %O", i, delivery);
delivery.id.should.equal(0);
delivery.format.should.equal(0);
delivery.settled.should.equal(true);
delivery.remote_settled.should.equal(true);
delivery.tag.toString().should.equal(`0`);
}
} catch (err) {
debug("An error occurred while running the test: ", err);
throw err;
} finally {
for (let i = 0; i < senderCount; i++) {
await senders[i].close();
}
}
});
});
describe("Negative scenarios", function () {
it("a message greater than 256 KB should fail.", async function () {
sender = client.createSender();
sender.should.be.instanceof(EventHubSender);
let data: EventData = {
body: Buffer.from("Z".repeat(300000))
}
try {
await sender.send(data);
await client.send(data);
} catch (err) {
debug(err);
should.exist(err);
should.equal(err.name, "MessageTooLargeError");
err.message.should.match(/.*The received message \(delivery-id:0, size:300016 bytes\) exceeds the limit \(262144 bytes\) currently allowed on the link\..*/ig);
err.message.should.match(/.*The received message \(delivery-id:(\d+), size:3000\d\d bytes\) exceeds the limit \(262144 bytes\) currently allowed on the link\..*/ig);
}
});
@ -230,9 +174,8 @@ describe("EventHub Sender", function () {
//const id = invalidIds[5];
it(`"${id}" should throw an error`, async function () {
try {
sender = client.createSender(id);
debug("Created sender and will be sending a message to partition id ...", id);
await sender.send({ body: "Hello world!" });
await client.send({ body: "Hello world!" }, id);
debug("sent the message.");
} catch (err) {
debug(`>>>> Received error for invalid partition id "${id}" - `, err);