зеркало из
1
0
Форкнуть 0

Merge pull request #169 from princjef/empty-messageid

Only set message_id when provided by caller
This commit is contained in:
Amar Zavery 2018-10-25 16:14:29 -07:00 коммит произвёл GitHub
Родитель dff107968b 06afe30fa2
Коммит c677dba0a1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 34 добавлений и 16 удалений

Просмотреть файл

@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved. // Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
import * as uuid from "uuid/v4";
import { import {
Message, MessageProperties, MessageHeader, Dictionary, messageHeader, messageProperties, Message, MessageProperties, MessageHeader, Dictionary, messageHeader, messageProperties,
MessageAnnotations, DeliveryAnnotations MessageAnnotations, DeliveryAnnotations
@ -199,9 +198,6 @@ export namespace EventData {
(msg as any)[prop] = (data.properties as any)[prop]; (msg as any)[prop] = (data.properties as any)[prop];
} }
} }
if (!msg.message_id) {
msg.message_id = uuid();
}
if (data.applicationProperties) { if (data.applicationProperties) {
msg.application_properties = data.applicationProperties; msg.application_properties = data.applicationProperties;
} }

Просмотреть файл

@ -310,10 +310,6 @@ export class EventHubSender extends LinkEntity {
} }
} }
if (!batchMessage.message_id) {
batchMessage.message_id = uuid();
}
// Finally encode the envelope (batch message). // Finally encode the envelope (batch message).
const encodedBatchMessage = message.encode(batchMessage); const encodedBatchMessage = message.encode(batchMessage);
log.sender("[%s] Sender '%s', sending encoded batch message.", log.sender("[%s] Sender '%s', sending encoded batch message.",
@ -365,7 +361,7 @@ export class EventHubSender extends LinkEntity {
this._sender!.credit, this._sender!.session.outgoing.available()); this._sender!.credit, this._sender!.session.outgoing.available());
if (this._sender!.sendable()) { if (this._sender!.sendable()) {
log.sender("[%s] Sender '%s', sending message with id '%s'.", this._context.connectionId, log.sender("[%s] Sender '%s', sending message with id '%s'.", this._context.connectionId,
this.name, message.message_id || tag); this.name, message.message_id || tag || '<not specified>');
let onRejected: Func<EventContext, void>; let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>; let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>; let onModified: Func<EventContext, void>;

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information. // Licensed under the MIT license. See LICENSE file in the project root for full license information.
import "mocha"; import "mocha";
import * as uuid from "uuid/v4";
import * as chai from "chai"; import * as chai from "chai";
import * as assert from "assert"; import * as assert from "assert";
const should = chai.should(); const should = chai.should();
@ -38,8 +39,10 @@ describe("Misc tests", function () {
const msgString = "A".repeat(220 * 1024); const msgString = "A".repeat(220 * 1024);
const msgBody = Buffer.from(msgString); const msgBody = Buffer.from(msgString);
const obj: EventData = { body: msgBody }; const obj: EventData = { body: msgBody };
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
debug("Sending one message with %d bytes.", bodysize); debug("Sending one message with %d bytes.", bodysize);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
let data = await breceiver.receive(5, 5); let data = await breceiver.receive(5, 5);
data.length.should.equal(0); data.length.should.equal(0);
await client.send(obj, partitionId); await client.send(obj, partitionId);
@ -51,6 +54,7 @@ describe("Misc tests", function () {
should.exist(data); should.exist(data);
data.length.should.equal(1); data.length.should.equal(1);
data[0].body.toString().should.equal(msgString); data[0].body.toString().should.equal(msgString);
should.not.exist((data[0].properties || {}).message_id);
}); });
it("should be able to send and receive a JSON object as a message correctly", async function () { it("should be able to send and receive a JSON object as a message correctly", async function () {
@ -68,8 +72,10 @@ describe("Misc tests", function () {
] ]
}; };
const obj: EventData = { body: msgBody }; const obj: EventData = { body: msgBody };
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
debug("Sending one message %O", obj); debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
await client.send(obj, partitionId); await client.send(obj, partitionId);
debug("Successfully sent the large message."); debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 10); const data = await breceiver.receive(5, 10);
@ -79,6 +85,7 @@ describe("Misc tests", function () {
data.length.should.equal(1); data.length.should.equal(1);
debug("Received message: %O", data); debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody); assert.deepEqual(data[0].body, msgBody);
should.not.exist((data[0].properties || {}).message_id);
}); });
it("should be able to send and receive an array as a message correctly", async function () { it("should be able to send and receive an array as a message correctly", async function () {
@ -93,9 +100,11 @@ describe("Misc tests", function () {
20, 20,
"some string" "some string"
]; ];
const obj: EventData = { body: msgBody }; const obj: EventData = { body: msgBody, properties: { message_id: uuid() } };
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
debug("Sending one message %O", obj); debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
await client.send(obj, partitionId); await client.send(obj, partitionId);
debug("Successfully sent the large message."); debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 5); const data = await breceiver.receive(5, 5);
@ -105,14 +114,17 @@ describe("Misc tests", function () {
data.length.should.equal(1); data.length.should.equal(1);
debug("Received message: %O", data); debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody); assert.deepEqual(data[0].body, msgBody);
assert.strictEqual(data[0].properties.message_id, obj.properties.message_id);
}); });
it("should be able to send a boolean as a message correctly", async function () { it("should be able to send a boolean as a message correctly", async function () {
const partitionId = hubInfo.partitionIds[0]; const partitionId = hubInfo.partitionIds[0];
const msgBody = true; const msgBody = true;
const obj: EventData = { body: msgBody }; const obj: EventData = { body: msgBody };
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
debug("Sending one message %O", obj); debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
await client.send(obj, partitionId); await client.send(obj, partitionId);
debug("Successfully sent the large message."); debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 5); const data = await breceiver.receive(5, 5);
@ -122,12 +134,15 @@ describe("Misc tests", function () {
data.length.should.equal(1); data.length.should.equal(1);
debug("Received message: %O", data); debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody); assert.deepEqual(data[0].body, msgBody);
should.not.exist((data[0].properties || {}).message_id);
}); });
it("should be able to send and receive batched messages correctly", async function () { it("should be able to send and receive batched messages correctly", async function () {
try { try {
const partitionId = hubInfo.partitionIds[0]; const partitionId = hubInfo.partitionIds[0];
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
let data = await breceiver.receive(5, 10); let data = await breceiver.receive(5, 10);
data.length.should.equal(0); data.length.should.equal(0);
const messageCount = 5; const messageCount = 5;
@ -145,6 +160,9 @@ describe("Misc tests", function () {
debug("received message: ", data); debug("received message: ", data);
should.exist(data); should.exist(data);
data.length.should.equal(5); data.length.should.equal(5);
for (const message of data) {
should.not.exist((message.properties || {}).message_id);
}
} catch (err) { } catch (err) {
debug("should not have happened, uber catch....", err); debug("should not have happened, uber catch....", err);
throw err; throw err;
@ -154,7 +172,9 @@ describe("Misc tests", function () {
it("should be able to send and receive batched messages as JSON objects correctly", async function () { it("should be able to send and receive batched messages as JSON objects correctly", async function () {
try { try {
const partitionId = hubInfo.partitionIds[0]; const partitionId = hubInfo.partitionIds[0];
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
let data = await breceiver.receive(5, 5); let data = await breceiver.receive(5, 5);
data.length.should.equal(0); data.length.should.equal(0);
const messageCount = 5; const messageCount = 5;
@ -173,6 +193,9 @@ describe("Misc tests", function () {
isBlue: false, isBlue: false,
} }
] ]
},
properties: {
message_id: uuid()
} }
}; };
d.push(obj); d.push(obj);
@ -187,6 +210,9 @@ describe("Misc tests", function () {
should.exist(data); should.exist(data);
data[0].body.count.should.equal(0); data[0].body.count.should.equal(0);
data.length.should.equal(5); data.length.should.equal(5);
for (const [index, message] of data.entries()) {
assert.strictEqual(message.properties.message_id, d[index].properties.message_id);
}
} catch (err) { } catch (err) {
debug("should not have happened, uber catch....", err); debug("should not have happened, uber catch....", err);
throw err; throw err;