Tests for pollQueue function
Including a hacky attempt to measure and assert effect of rate limiting. Fixes #8
This commit is contained in:
Родитель
3ac9bec465
Коммит
4cb6dd5e78
|
@ -1,10 +1,179 @@
|
|||
const { expect } = require("chai");
|
||||
const sinon = require("sinon");
|
||||
|
||||
const {
|
||||
resetMocks,
|
||||
makePromiseFn,
|
||||
mocks,
|
||||
env: { CONFIG_TABLE, QUEUE_NAME, PROCESS_QUEUE_FUNCTION },
|
||||
constants: { QueueUrl },
|
||||
constantsModule
|
||||
} = global;
|
||||
|
||||
const { EXECUTION_MUTEX_KEY, RATE_LIMIT } = global.constantsModule;
|
||||
|
||||
// NOTE: Import the test subject as late as possible so that the mocks work
|
||||
const pollQueue = require("./pollQueue");
|
||||
|
||||
const wait = delay => new Promise(resolve => setTimeout(resolve, delay));
|
||||
|
||||
describe("functions/pollQueue.handler", () => {
|
||||
it("should exist", () => {
|
||||
expect(pollQueue.handler).to.not.be.undefined;
|
||||
const subject = pollQueue.handler;
|
||||
|
||||
const logMethods = ["log", "warn", "info", "time", "timeEnd"];
|
||||
|
||||
beforeEach(() => {
|
||||
resetMocks();
|
||||
logMethods.forEach(name => sinon.spy(console, name));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
logMethods.forEach(name => console[name].restore());
|
||||
});
|
||||
|
||||
it("should exit if another instance is already running", async () => {
|
||||
mocks.putItem.returns({
|
||||
promise: () => {
|
||||
throw "Fail";
|
||||
}
|
||||
});
|
||||
|
||||
await subject();
|
||||
|
||||
expect(mocks.putItem.called).to.be.true;
|
||||
const putArg = mocks.putItem.firstCall.args[0];
|
||||
expect(putArg.TableName).to.equal(CONFIG_TABLE);
|
||||
expect(putArg.Item.key).to.equal(EXECUTION_MUTEX_KEY);
|
||||
expect(console.warn.firstCall.args).to.deep.equal([
|
||||
"Could not acquire execution mutex",
|
||||
"Fail"
|
||||
]);
|
||||
});
|
||||
|
||||
it("should exit when remaining execution time is close to exhausted", async () => {
|
||||
const getRemainingTimeInMillis = sinon.stub().returns(500);
|
||||
|
||||
await subject({}, { getRemainingTimeInMillis });
|
||||
|
||||
expect(mocks.putItem.called).to.be.true;
|
||||
const putArg = mocks.putItem.firstCall.args[0];
|
||||
expect(mocks.putItem.firstCall.args[0].TableName).to.equal(CONFIG_TABLE);
|
||||
expect(putArg.Item.key).to.equal(EXECUTION_MUTEX_KEY);
|
||||
|
||||
expect(getRemainingTimeInMillis.called).to.be.true;
|
||||
|
||||
const infoArgs = console.info.args.map(([msg]) => msg);
|
||||
expect(infoArgs).to.deep.equal([
|
||||
"Execution mutex acquired",
|
||||
"Poller start",
|
||||
"Poller exit",
|
||||
"Execution mutex released"
|
||||
]);
|
||||
|
||||
expect(mocks.deleteItem.called).to.be.true;
|
||||
const deleteArg = mocks.deleteItem.firstCall.args[0];
|
||||
expect(deleteArg.TableName).to.equal(CONFIG_TABLE);
|
||||
expect(deleteArg.Key.key).to.equal(EXECUTION_MUTEX_KEY);
|
||||
});
|
||||
|
||||
it("should process one message by invoking one lambda function", async () => {
|
||||
const requestId = "8675309";
|
||||
const messageBody = { requestId, testing: "testing" };
|
||||
const Messages = [{ Body: JSON.stringify(messageBody) }];
|
||||
|
||||
mocks.receiveMessage.returns(makePromiseFn({ Messages }));
|
||||
|
||||
const getRemainingTimeInMillis = sinon.stub();
|
||||
[20000, 2000, 200, 20].forEach((time, idx) =>
|
||||
getRemainingTimeInMillis.onCall(idx).returns(time)
|
||||
);
|
||||
|
||||
Object.assign(constantsModule, {
|
||||
POLL_DELAY: 10
|
||||
});
|
||||
|
||||
await subject({}, { getRemainingTimeInMillis });
|
||||
|
||||
expect(mocks.getQueueUrl.callCount).to.equal(1);
|
||||
expect(mocks.getQueueUrl.lastCall.args[0]).to.deep.equal({
|
||||
QueueName: QUEUE_NAME
|
||||
});
|
||||
|
||||
expect(mocks.receiveMessage.callCount).to.equal(1);
|
||||
expect(mocks.receiveMessage.lastCall.args[0]).to.deep.include({
|
||||
QueueUrl,
|
||||
MaxNumberOfMessages: RATE_LIMIT,
|
||||
MessageAttributeNames: ["All"]
|
||||
});
|
||||
|
||||
expect(mocks.invoke.callCount).to.equal(1);
|
||||
expect(mocks.invoke.args[0][0]).to.deep.equal({
|
||||
FunctionName: PROCESS_QUEUE_FUNCTION,
|
||||
InvocationType: "Event",
|
||||
LogType: "None",
|
||||
Payload: JSON.stringify(Messages[0])
|
||||
});
|
||||
|
||||
const infoArgs = console.info.args.map(([msg]) => msg);
|
||||
expect(infoArgs).to.deep.equal([
|
||||
"Execution mutex acquired",
|
||||
"Poller start",
|
||||
"Pausing for",
|
||||
"Remaining",
|
||||
"Poller exit",
|
||||
"Execution mutex released"
|
||||
]);
|
||||
|
||||
const timeEndArgs = console.timeEnd.args.map(([msg]) => msg);
|
||||
expect(timeEndArgs).to.deep.equal([
|
||||
"SQS",
|
||||
`Message ${requestId}`,
|
||||
"Worker batch",
|
||||
"pollQueue 1"
|
||||
]);
|
||||
});
|
||||
|
||||
it("should respect rate limiting in message processing", async () => {
|
||||
Object.assign(constantsModule, {
|
||||
RATE_LIMIT: 5,
|
||||
RATE_PERIOD: 250,
|
||||
MAX_LONG_POLL_PERIOD: 20,
|
||||
POLL_DELAY: 50
|
||||
});
|
||||
|
||||
const testMessages = [];
|
||||
for (let i = 0; i < 20; i++) {
|
||||
testMessages.push({
|
||||
Body: JSON.stringify({ requestId: i, testing: "testing" })
|
||||
});
|
||||
}
|
||||
|
||||
const mockReceiveMessage = ({ MaxNumberOfMessages }) => ({
|
||||
promise: async () => {
|
||||
await wait(50);
|
||||
return { Messages: testMessages.splice(0, MaxNumberOfMessages) };
|
||||
}
|
||||
});
|
||||
|
||||
mocks.receiveMessage.callsFake(mockReceiveMessage);
|
||||
|
||||
const executionPeriod = 1500;
|
||||
const limitTime = Date.now() + executionPeriod;
|
||||
const getRemainingTimeInMillis = sinon
|
||||
.stub()
|
||||
.callsFake(() => limitTime - Date.now());
|
||||
|
||||
const startTime = Date.now();
|
||||
await subject({}, { getRemainingTimeInMillis });
|
||||
const endTime = Date.now();
|
||||
|
||||
const duration = endTime - startTime;
|
||||
const calls = mocks.invoke.callCount;
|
||||
const resultRate = calls / (duration / constantsModule.RATE_PERIOD);
|
||||
const yieldMessages = console.log.args.filter(([msg]) =>
|
||||
msg.includes("Yielding")
|
||||
);
|
||||
|
||||
expect(resultRate < constantsModule.RATE_LIMIT).to.be.true;
|
||||
expect(yieldMessages.length > 0).to.be.true;
|
||||
});
|
||||
});
|
||||
|
|
|
@ -5,65 +5,42 @@ const DBD = new AWS.DynamoDB.DocumentClient();
|
|||
const SQS = new AWS.SQS({ apiVersion: "2012-11-05" });
|
||||
const Lambda = new AWS.Lambda({ apiVersion: "2015-03-31" });
|
||||
|
||||
const { CONFIG_TABLE, QUEUE_NAME, PROCESS_QUEUE_FUNCTION } = process.env;
|
||||
|
||||
const RATE_LIMIT = 5;
|
||||
const RATE_PERIOD = 1000;
|
||||
const MAX_LONG_POLL_PERIOD = 20;
|
||||
const POLL_DELAY = 100;
|
||||
const EXECUTION_MUTEX_KEY = "pollQueueExecutionExpires";
|
||||
const EXECUTION_MUTEX_TTL = 50 * 1000;
|
||||
|
||||
const wait = delay => new Promise(resolve => setTimeout(resolve, delay));
|
||||
|
||||
// Running list of timestamps for hits on rate limit
|
||||
let rateHits = [];
|
||||
let rateHits;
|
||||
|
||||
module.exports.handler = async function(event, context) {
|
||||
const now = Date.now();
|
||||
const constants = require("../lib/constants");
|
||||
const { POLL_DELAY } = constants;
|
||||
|
||||
try {
|
||||
await DBD.put({
|
||||
TableName: CONFIG_TABLE,
|
||||
Item: {
|
||||
key: EXECUTION_MUTEX_KEY,
|
||||
value: now + EXECUTION_MUTEX_TTL
|
||||
},
|
||||
ConditionExpression: "#key <> :key OR (#key = :key AND #value < :value)",
|
||||
ExpressionAttributeNames: { "#key": "key", "#value": "value" },
|
||||
ExpressionAttributeValues: {
|
||||
":key": EXECUTION_MUTEX_KEY,
|
||||
":value": Date.now()
|
||||
}
|
||||
}).promise();
|
||||
await acquireExecutionLock(process.env, constants);
|
||||
} catch (err) {
|
||||
console.warn("Could not acquire execution mutex", err);
|
||||
return;
|
||||
}
|
||||
console.info("Execution mutex acquired");
|
||||
|
||||
rateHits = [];
|
||||
let polls = 0;
|
||||
console.log("Poller start");
|
||||
do {
|
||||
console.info("Poller start");
|
||||
while (Math.floor(context.getRemainingTimeInMillis() / 1000) >= 1) {
|
||||
try {
|
||||
const tname = `pollQueue ${++polls}`;
|
||||
console.time(tname);
|
||||
await pollQueue(context);
|
||||
await pollQueue(process.env, constants, context);
|
||||
console.timeEnd(tname);
|
||||
} catch (err) {
|
||||
console.error("Error in pollQueue", err);
|
||||
return;
|
||||
}
|
||||
console.info("Pausing for", POLL_DELAY, "ms");
|
||||
await wait(POLL_DELAY);
|
||||
console.log("Remaining", context.getRemainingTimeInMillis(), "ms");
|
||||
} while (Math.floor(context.getRemainingTimeInMillis() / 1000) > 1);
|
||||
console.log("Poller exit");
|
||||
console.info("Remaining", context.getRemainingTimeInMillis(), "ms");
|
||||
}
|
||||
console.info("Poller exit");
|
||||
|
||||
try {
|
||||
await DBD.delete({
|
||||
TableName: CONFIG_TABLE,
|
||||
Key: { key: EXECUTION_MUTEX_KEY }
|
||||
}).promise();
|
||||
await releaseExecutionLock(process.env, constants);
|
||||
} catch (err) {
|
||||
console.warn("Could not release execution mutex", err);
|
||||
return;
|
||||
|
@ -71,12 +48,45 @@ module.exports.handler = async function(event, context) {
|
|||
console.info("Execution mutex released");
|
||||
};
|
||||
|
||||
async function pollQueue(context) {
|
||||
const wait = delay => new Promise(resolve => setTimeout(resolve, delay));
|
||||
|
||||
const acquireExecutionLock = (
|
||||
{ CONFIG_TABLE },
|
||||
{ EXECUTION_MUTEX_KEY, EXECUTION_MUTEX_TTL }
|
||||
) =>
|
||||
DBD.put({
|
||||
TableName: CONFIG_TABLE,
|
||||
Item: {
|
||||
key: EXECUTION_MUTEX_KEY,
|
||||
value: Date.now() + EXECUTION_MUTEX_TTL
|
||||
},
|
||||
ConditionExpression: "#key <> :key OR (#key = :key AND #value < :value)",
|
||||
ExpressionAttributeNames: { "#key": "key", "#value": "value" },
|
||||
ExpressionAttributeValues: {
|
||||
":key": EXECUTION_MUTEX_KEY,
|
||||
":value": Date.now()
|
||||
}
|
||||
}).promise();
|
||||
|
||||
const releaseExecutionLock = (
|
||||
{ CONFIG_TABLE },
|
||||
{ EXECUTION_MUTEX_KEY, EXECUTION_MUTEX_TTL }
|
||||
) =>
|
||||
DBD.delete({
|
||||
TableName: CONFIG_TABLE,
|
||||
Key: { key: EXECUTION_MUTEX_KEY }
|
||||
}).promise();
|
||||
|
||||
async function pollQueue(
|
||||
{ QUEUE_NAME, PROCESS_QUEUE_FUNCTION },
|
||||
{ MAX_LONG_POLL_PERIOD, RATE_PERIOD, RATE_LIMIT },
|
||||
context
|
||||
) {
|
||||
// Calculate seconds remaining for poller execution, using maximum for
|
||||
// long poll or whatever time we have left
|
||||
const WaitTimeSeconds = Math.min(
|
||||
MAX_LONG_POLL_PERIOD,
|
||||
Math.floor(context.getRemainingTimeInMillis() / 1000) - 1
|
||||
Math.floor(context.getRemainingTimeInMillis() / 1000)
|
||||
);
|
||||
if (WaitTimeSeconds <= 0) {
|
||||
console.log("Out of time");
|
||||
|
|
|
@ -1,8 +1,15 @@
|
|||
exports.DEFAULT_HAWK_ALGORITHM = "sha256";
|
||||
|
||||
exports.DEV_CREDENTIALS = {
|
||||
devuser: {
|
||||
key: "devkey",
|
||||
algorithm: exports.DEFAULT_HAWK_ALGORITHM
|
||||
}
|
||||
module.exports = {
|
||||
DEFAULT_HAWK_ALGORITHM: "sha256",
|
||||
DEV_CREDENTIALS: {
|
||||
devuser: {
|
||||
key: "devkey",
|
||||
algorithm: "sha256"
|
||||
}
|
||||
},
|
||||
RATE_LIMIT: 5,
|
||||
RATE_PERIOD: 1000,
|
||||
MAX_LONG_POLL_PERIOD: 20,
|
||||
POLL_DELAY: 100,
|
||||
EXECUTION_MUTEX_KEY: "pollQueueExecutionExpires",
|
||||
EXECUTION_MUTEX_TTL: 50 * 1000
|
||||
};
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
const sinon = require("sinon");
|
||||
const AWS = require("aws-sdk");
|
||||
const request = require("request-promise-native");
|
||||
const mockRequire = require("mock-require");
|
||||
|
||||
global.env = {
|
||||
CONFIG_TABLE: "test-config",
|
||||
CREDENTIALS_TABLE: "test-credentials",
|
||||
QUEUE_NAME: "test-queue",
|
||||
CONTENT_BUCKET: "test-bucket",
|
||||
PROCESS_QUEUE_FUNCTION: "process-queue-item",
|
||||
UPSTREAM_SERVICE_URL: "https://api.example.com/v1.0/Match",
|
||||
UPSTREAM_SERVICE_KEY: "1234567890"
|
||||
};
|
||||
|
@ -29,14 +32,36 @@ global.constants = {
|
|||
}
|
||||
};
|
||||
|
||||
const defaultConstantsModule = {
|
||||
DEFAULT_HAWK_ALGORITHM: "sha256",
|
||||
DEV_CREDENTIALS: {
|
||||
devuser: {
|
||||
key: "devkey",
|
||||
algorithm: "sha256"
|
||||
}
|
||||
},
|
||||
RATE_LIMIT: 5,
|
||||
RATE_PERIOD: 500,
|
||||
MAX_LONG_POLL_PERIOD: 20,
|
||||
POLL_DELAY: 100,
|
||||
EXECUTION_MUTEX_KEY: "pollQueueExecutionExpires",
|
||||
EXECUTION_MUTEX_TTL: 50 * 1000
|
||||
};
|
||||
global.constantsModule = Object.assign({}, defaultConstantsModule);
|
||||
mockRequire("./constants", global.constantsModule);
|
||||
|
||||
global.mocks = {
|
||||
deleteMessage: (AWS.SQS.prototype.deleteMessage = sinon.stub()),
|
||||
getItem: (AWS.DynamoDB.DocumentClient.prototype.get = sinon.stub()),
|
||||
putItem: (AWS.DynamoDB.DocumentClient.prototype.put = sinon.stub()),
|
||||
deleteItem: (AWS.DynamoDB.DocumentClient.prototype.delete = sinon.stub()),
|
||||
getQueueUrl: (AWS.SQS.prototype.getQueueUrl = sinon.stub()),
|
||||
getSignedUrl: (AWS.S3.prototype.getSignedUrl = sinon.stub()),
|
||||
putObject: (AWS.S3.prototype.putObject = sinon.stub()),
|
||||
requestPost: (request.post = sinon.stub()),
|
||||
sendMessage: (AWS.SQS.prototype.sendMessage = sinon.stub())
|
||||
sendMessage: (AWS.SQS.prototype.sendMessage = sinon.stub()),
|
||||
receiveMessage: (AWS.SQS.prototype.receiveMessage = sinon.stub()),
|
||||
invoke: (AWS.Lambda.prototype.invoke = sinon.stub())
|
||||
};
|
||||
|
||||
global.makePromiseFn = out => ({ promise: () => Promise.resolve(out) });
|
||||
|
@ -48,16 +73,20 @@ global.resetMocks = () => {
|
|||
constants: { QueueUrl, MessageId, ETag }
|
||||
} = global;
|
||||
|
||||
Object.assign(global.constantsModule, defaultConstantsModule);
|
||||
Object.assign(process.env, global.env);
|
||||
|
||||
Object.values(global.mocks).forEach(mock => mock.resetHistory());
|
||||
|
||||
mocks.requestPost.resolves({});
|
||||
|
||||
mocks.deleteMessage.returns(makePromiseFn({}));
|
||||
mocks.deleteItem.returns(makePromiseFn({}));
|
||||
mocks.putItem.returns(makePromiseFn({}));
|
||||
mocks.getItem.returns(makePromiseFn({}));
|
||||
mocks.getQueueUrl.returns(makePromiseFn({ QueueUrl }));
|
||||
mocks.getSignedUrl.returns("");
|
||||
mocks.putObject.returns(makePromiseFn({ ETag }));
|
||||
mocks.sendMessage.returns(makePromiseFn({ MessageId }));
|
||||
mocks.receiveMessage.returns(makePromiseFn({ MessageId }));
|
||||
mocks.invoke.returns(makePromiseFn({}));
|
||||
};
|
||||
|
|
|
@ -5371,6 +5371,27 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"mock-require": {
|
||||
"version": "3.0.2",
|
||||
"resolved": "https://registry.npmjs.org/mock-require/-/mock-require-3.0.2.tgz",
|
||||
"integrity": "sha512-aD/Y1ZFHqw5pHg3HVQ50dLbfaAAcytS6sqLuhP51Dk3TSPdFb2VkSAa3mjrHifLIlGAtwQHJHINafAyqAne7vA==",
|
||||
"dev": true,
|
||||
"requires": {
|
||||
"get-caller-file": "^1.0.2",
|
||||
"normalize-path": "^2.1.1"
|
||||
},
|
||||
"dependencies": {
|
||||
"normalize-path": {
|
||||
"version": "2.1.1",
|
||||
"resolved": "https://registry.npmjs.org/normalize-path/-/normalize-path-2.1.1.tgz",
|
||||
"integrity": "sha1-GrKLVW4Zg2Oowab35vogE3/mrtk=",
|
||||
"dev": true,
|
||||
"requires": {
|
||||
"remove-trailing-separator": "^1.0.1"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"moment": {
|
||||
"version": "2.22.1",
|
||||
"resolved": "https://registry.npmjs.org/moment/-/moment-2.22.1.tgz",
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
"husky": "0.14.3",
|
||||
"lint-staged": "7.1.2",
|
||||
"mocha": "5.2.0",
|
||||
"mock-require": "^3.0.2",
|
||||
"npm-run-all": "4.1.3",
|
||||
"nsp": "3.2.1",
|
||||
"onchange": "4.0.0",
|
||||
|
|
Загрузка…
Ссылка в новой задаче