Implement metrics pings
- Tweaks to metrics.md to reflect current functionality - lib/metrics.js to wrap metrics pings - Add pings to functions - Mock endpoint that just logs POST requests, used for metrics in dev stack - Tests Fixes #47
This commit is contained in:
Родитель
f94bbc79fa
Коммит
44eba231b6
|
@ -154,5 +154,6 @@ When using `serverless deploy` to deploy the stack, you can use several environm
|
|||
- `UPSTREAM_SERVICE_KEY` - the private subscription key for the upstream web service
|
||||
- `ENABLE_DEV_AUTH=1` - This enables a hardcoded user id / key for development (off by default)
|
||||
- `DISABLE_AUTH_CACHE=1` - Authentication credentials are cached in memory in the `accept` API function. This lasts until AWS recycles the container hosting the function. Setting this variable disables the cache.
|
||||
- `METRICS_URL` - Override for Ping Centre service URL used for internal metrics. By default, the stage or production Ping Centre URL is used based on `NODE_ENV`
|
||||
|
||||
You can see these variables used by scripts defined in `package.json` for development convenience.
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# Watchdog Metrics
|
||||
*Last Update: 2018-05-22*
|
||||
*Last Update: 2018-06-08*
|
||||
|
||||
## Analysis
|
||||
Questions we want to answer with metrics data include:
|
||||
|
@ -36,7 +36,7 @@ Additional fields submitted are described below.
|
|||
- *consumer_name*: the name of the consumer submitting the request: string
|
||||
- *event*: "new_item": string
|
||||
- *watchdog_id*: the ID assigned to the task: string
|
||||
- *type*: Type of item submitted (eg. 'png' or 'jpg'): string
|
||||
- *type*: Content-Type of item submitted (eg. 'image/png' or 'image/jpg'): string
|
||||
|
||||
Example:
|
||||
```
|
||||
|
@ -47,19 +47,27 @@ Example:
|
|||
"consumer_name": "screenshots",
|
||||
"event": "new_item",
|
||||
"watchdog_id": "9ad08ec4-be1a-4327-b4ef-282bed37621f"
|
||||
"type": "png",
|
||||
"type": "image/png",
|
||||
}
|
||||
```
|
||||
|
||||
### A worker wakes up
|
||||
A worker wakes up periodically to process the queue. When it wakes up it
|
||||
selects a portion of the queue to process and it shuts down when it finishes
|
||||
processing them. When the worker wakes up *or* shuts down, it will send:
|
||||
- *event*: "worker_awakes": string
|
||||
### Queue poller periodic heartbeat
|
||||
The `pollQueue` function repeatedly polls the queue for jobs waiting to be
|
||||
processed. It gets called every 60 seconds and runs for most of 60 seconds
|
||||
before exiting. (This is a hack to work around lacking support for long-running
|
||||
functions in Amazon Lambda.)
|
||||
|
||||
Metrics pings will be sent at these times while the `pollQueue` function is running:
|
||||
- when the function starts (every 60 seconds)
|
||||
- roughly every 20 seconds while it runs
|
||||
- when the function exits (roughly 60 seconds after start)
|
||||
|
||||
The metrics sent in the ping will contain:
|
||||
- *event*: "poller_heartbeat": string
|
||||
- *poller_id*: UUID given by Lambda to the current invocation of the `pollQueue` function
|
||||
- *items_in_queue*: Number of items in the queue before the worker removes any: integer
|
||||
- *items_in_progress*: Number of items being processed: integer
|
||||
- *items_in_waiting*: Number of items waiting to be queued: integer
|
||||
- *items_to_claim*: Number of items the worker will take out: integer
|
||||
|
||||
Example:
|
||||
```
|
||||
|
@ -67,23 +75,23 @@ Example:
|
|||
"topic": "watchdog-proxy",
|
||||
"timestamp": "2018-05-18T16:38:33.464Z",
|
||||
|
||||
"event": "worker_awakes",
|
||||
"event": "poller_heartbeat",
|
||||
"poller_id": "31417de1-b3ef-4e90-be3c-e5116d459d1d",
|
||||
"items_in_queue": 1504,
|
||||
"items_in_progress": 22,
|
||||
"items_in_waiting": 38,
|
||||
"items_to_claim": 250
|
||||
"items_in_waiting": 38
|
||||
}
|
||||
```
|
||||
|
||||
### A worker processes the queue
|
||||
For *each* item it processes:
|
||||
### A worker processes a queue item
|
||||
For *each* item fetched from the queue by the poller, the `processQueueItem` function will be invoked. That function, in turn, will send these metrics:
|
||||
- *event*: "worker_works": string
|
||||
- *worker_id*: UUID given by Lambda to the current invocation of the `processQueueItem` function
|
||||
- *consumer_name*: the ID of the consumer submitting the request: string
|
||||
- *watchdog_id*: the ID assigned to the task: string
|
||||
- *photodna_tracking_id*: ID from PhotoDNA: string
|
||||
- *is_match*: Whether the response was positive or negative: boolean
|
||||
- *is_error*: Was the response an error?: boolean
|
||||
- *timing_retrieved*: time (in ms) to retrieve item from queue: integer
|
||||
- *timing_sent*: time (in ms) to send item to PhotoDNA: integer
|
||||
- *timing_received*: time (in ms) before response from PhotoDNA: integer
|
||||
- *timing_submitted*: time (in ms) to finish sending a response to consumer's report URL: integer
|
||||
|
@ -95,32 +103,15 @@ Example:
|
|||
"timestamp": "2018-05-18T16:38:33.464Z",
|
||||
|
||||
"event": "worker_works",
|
||||
"worker_id": "8cdb1e6b-7e15-489d-b171-e7a05781c5da",
|
||||
"consumer_name": "screenshots,
|
||||
"watchdog_id": "9ad08ec4-be1a-4327-b4ef-282bed37621f"
|
||||
"photodna_tracking_id": "1_photodna_a0e3d02b-1a0a-4b38-827f-764acd288c25",
|
||||
"is_match": false,
|
||||
"is_error": false
|
||||
"is_error": false,
|
||||
|
||||
"timing_retrieved": 8,
|
||||
"timing_sent": 89,
|
||||
"timing_received": 161,
|
||||
"timing_submitted": 35
|
||||
}
|
||||
```
|
||||
|
||||
### A worker shuts down
|
||||
When a worker finishes the work it claimed it shuts down. When it does, it will
|
||||
send:
|
||||
- *event*: "worker_sleeps": string
|
||||
- *items_processed*: Number of items the worker processed successfully: integer
|
||||
|
||||
Example:
|
||||
```
|
||||
{
|
||||
"topic": "watchdog-proxy",
|
||||
"timestamp": "2018-05-18T16:38:33.464Z",
|
||||
|
||||
"event": "worker_sleeps",
|
||||
"items_processed": 250
|
||||
}
|
||||
```
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
const sinon = require("sinon");
|
||||
const { expect } = require("chai");
|
||||
const Hawk = require("hawk");
|
||||
|
||||
|
@ -10,13 +11,21 @@ const {
|
|||
constants: { QueueUrl, requestId }
|
||||
} = global;
|
||||
|
||||
const Metrics = require("../lib/metrics");
|
||||
const accept = require("./accept");
|
||||
|
||||
describe("functions/accept.post", () => {
|
||||
let metricsStub;
|
||||
|
||||
beforeEach(() => {
|
||||
global.resetMocks();
|
||||
process.env.ENABLE_DEV_AUTH = "1";
|
||||
process.env.DISABLE_AUTH_CACHE = "1";
|
||||
metricsStub = sinon.stub(Metrics, "newItem");
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
metricsStub.restore();
|
||||
});
|
||||
|
||||
describe("Hawk authentication", () => {
|
||||
|
@ -149,6 +158,8 @@ describe("functions/accept.post", () => {
|
|||
const body = Object.assign({}, DEFAULT_POST_BODY);
|
||||
delete body.image;
|
||||
|
||||
process.env.METRICS_URL = "https://example.com";
|
||||
|
||||
const result = await acceptPost({
|
||||
httpMethod: "POST",
|
||||
proto: "https",
|
||||
|
@ -224,6 +235,13 @@ describe("functions/accept.post", () => {
|
|||
ContentType: "application/json"
|
||||
});
|
||||
|
||||
expect(metricsStub.called).to.be.true;
|
||||
expect(metricsStub.args[0][0]).to.deep.include({
|
||||
consumer_name: id,
|
||||
watchdog_id: requestId,
|
||||
type: imageContentType
|
||||
});
|
||||
|
||||
expect(result.statusCode).to.equal(201);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,6 +7,7 @@ const S3 = new AWS.S3({ apiVersion: "2006-03-01" });
|
|||
const SQS = new AWS.SQS({ apiVersion: "2012-11-05" });
|
||||
const documentClient = new AWS.DynamoDB.DocumentClient();
|
||||
const { DEV_CREDENTIALS, DEFAULT_HAWK_ALGORITHM } = require("../lib/constants");
|
||||
const Metrics = require("../lib/metrics");
|
||||
|
||||
const REQUIRED_FIELDS = ["image", "negative_uri", "positive_uri"];
|
||||
|
||||
|
@ -105,6 +106,12 @@ module.exports.post = async function(event, context) {
|
|||
const { QueueUrl } = await SQS.getQueueUrl({ QueueName }).promise();
|
||||
await SQS.sendMessage({ QueueUrl, MessageBody }).promise();
|
||||
|
||||
await Metrics.newItem({
|
||||
consumer_name: authArtifacts.id,
|
||||
watchdog_id: requestId,
|
||||
type: image.contentType
|
||||
});
|
||||
|
||||
return response(201, {
|
||||
id: requestId,
|
||||
negative_uri,
|
||||
|
|
|
@ -23,6 +23,11 @@ module.exports.clientPositivePost = async (event, context) => {
|
|||
return response(200, { status: "OK" });
|
||||
};
|
||||
|
||||
module.exports.logPost = async (event, context) => {
|
||||
console.log("body", event.body);
|
||||
return response(200, { status: "OK" });
|
||||
};
|
||||
|
||||
function response(statusCode, body, headers = {}) {
|
||||
return {
|
||||
statusCode,
|
||||
|
|
|
@ -10,8 +10,11 @@ const {
|
|||
constantsModule
|
||||
} = global;
|
||||
|
||||
const awsRequestId = "test-uuid";
|
||||
|
||||
const { EXECUTION_MUTEX_KEY, RATE_LIMIT } = global.constantsModule;
|
||||
|
||||
const Metrics = require("../lib/metrics");
|
||||
const pollQueue = require("./pollQueue");
|
||||
|
||||
const wait = delay => new Promise(resolve => setTimeout(resolve, delay));
|
||||
|
@ -21,13 +24,17 @@ describe("functions/pollQueue.handler", () => {
|
|||
|
||||
const logMethods = ["log", "warn", "info", "time", "timeEnd"];
|
||||
|
||||
let metricsStub;
|
||||
|
||||
beforeEach(() => {
|
||||
resetMocks();
|
||||
logMethods.forEach(name => sinon.spy(console, name));
|
||||
metricsStub = sinon.stub(Metrics, "pollerHeartbeat");
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
logMethods.forEach(name => console[name].restore());
|
||||
metricsStub.restore();
|
||||
});
|
||||
|
||||
it("should exit if another instance is already running", async () => {
|
||||
|
@ -47,12 +54,13 @@ describe("functions/pollQueue.handler", () => {
|
|||
"Could not acquire execution mutex",
|
||||
"Fail"
|
||||
]);
|
||||
expect(metricsStub.callCount).to.equal(0);
|
||||
});
|
||||
|
||||
it("should exit when remaining execution time is close to exhausted", async () => {
|
||||
const getRemainingTimeInMillis = sinon.stub().returns(500);
|
||||
|
||||
await subject({}, { getRemainingTimeInMillis });
|
||||
await subject({}, { awsRequestId, getRemainingTimeInMillis });
|
||||
|
||||
expect(mocks.putItem.called).to.be.true;
|
||||
const putArg = mocks.putItem.firstCall.args[0];
|
||||
|
@ -64,11 +72,15 @@ describe("functions/pollQueue.handler", () => {
|
|||
const infoArgs = console.info.args.map(([msg]) => msg);
|
||||
expect(infoArgs).to.deep.equal([
|
||||
"Execution mutex acquired",
|
||||
"Sending heartbeat metrics",
|
||||
"Poller start",
|
||||
"Poller exit",
|
||||
"Execution mutex released"
|
||||
"Execution mutex released",
|
||||
"Sending heartbeat metrics"
|
||||
]);
|
||||
|
||||
expect(metricsStub.callCount).to.equal(2);
|
||||
|
||||
expect(mocks.deleteItem.called).to.be.true;
|
||||
const deleteArg = mocks.deleteItem.firstCall.args[0];
|
||||
expect(deleteArg.TableName).to.equal(CONFIG_TABLE);
|
||||
|
@ -91,9 +103,9 @@ describe("functions/pollQueue.handler", () => {
|
|||
POLL_DELAY: 10
|
||||
});
|
||||
|
||||
await subject({}, { getRemainingTimeInMillis });
|
||||
await subject({}, { awsRequestId, getRemainingTimeInMillis });
|
||||
|
||||
expect(mocks.getQueueUrl.callCount).to.equal(1);
|
||||
expect(mocks.getQueueUrl.called).to.be.true;
|
||||
expect(mocks.getQueueUrl.lastCall.args[0]).to.deep.equal({
|
||||
QueueName: QUEUE_NAME
|
||||
});
|
||||
|
@ -116,13 +128,25 @@ describe("functions/pollQueue.handler", () => {
|
|||
const infoArgs = console.info.args.map(([msg]) => msg);
|
||||
expect(infoArgs).to.deep.equal([
|
||||
"Execution mutex acquired",
|
||||
"Sending heartbeat metrics",
|
||||
"Poller start",
|
||||
"Sending heartbeat metrics",
|
||||
"Pausing for",
|
||||
"Remaining",
|
||||
"Poller exit",
|
||||
"Execution mutex released"
|
||||
"Execution mutex released",
|
||||
"Sending heartbeat metrics"
|
||||
]);
|
||||
|
||||
expect(metricsStub.callCount).to.equal(3);
|
||||
const metricsCall = metricsStub.args[0][0];
|
||||
expect(metricsCall.poller_id).to.equal(awsRequestId);
|
||||
expect(metricsCall).to.include.keys(
|
||||
"items_in_queue",
|
||||
"items_in_progress",
|
||||
"items_in_waiting"
|
||||
);
|
||||
|
||||
const timeEndArgs = console.timeEnd.args.map(([msg]) => msg);
|
||||
expect(timeEndArgs).to.deep.equal([
|
||||
"SQS",
|
||||
|
@ -163,7 +187,7 @@ describe("functions/pollQueue.handler", () => {
|
|||
.callsFake(() => limitTime - Date.now());
|
||||
|
||||
const startTime = Date.now();
|
||||
await subject({}, { getRemainingTimeInMillis });
|
||||
await subject({}, { awsRequestId, getRemainingTimeInMillis });
|
||||
const endTime = Date.now();
|
||||
|
||||
const duration = endTime - startTime;
|
||||
|
|
|
@ -4,14 +4,20 @@ const AWS = require("aws-sdk");
|
|||
const DBD = new AWS.DynamoDB.DocumentClient();
|
||||
const SQS = new AWS.SQS({ apiVersion: "2012-11-05" });
|
||||
const Lambda = new AWS.Lambda({ apiVersion: "2015-03-31" });
|
||||
const Metrics = require("../lib/metrics");
|
||||
|
||||
// Running list of timestamps for hits on rate limit
|
||||
let rateHits;
|
||||
// Last time heartbeat metrics were sent, for throttling
|
||||
let lastHeartbeat;
|
||||
|
||||
module.exports.handler = async function(event, context) {
|
||||
const constants = require("../lib/constants");
|
||||
const { POLL_DELAY } = constants;
|
||||
|
||||
rateHits = [];
|
||||
lastHeartbeat = false;
|
||||
|
||||
try {
|
||||
await acquireExecutionLock(process.env, constants);
|
||||
} catch (err) {
|
||||
|
@ -20,7 +26,12 @@ module.exports.handler = async function(event, context) {
|
|||
}
|
||||
console.info("Execution mutex acquired");
|
||||
|
||||
rateHits = [];
|
||||
try {
|
||||
await sendHeartbeatMetrics(process.env, context);
|
||||
} catch (err) {
|
||||
console.warn("Failed to send initial heartbeat metrics", err);
|
||||
}
|
||||
|
||||
let polls = 0;
|
||||
console.info("Poller start");
|
||||
while (Math.floor(context.getRemainingTimeInMillis() / 1000) >= 1) {
|
||||
|
@ -33,6 +44,13 @@ module.exports.handler = async function(event, context) {
|
|||
console.error("Error in pollQueue", err);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await maybeSendHeartbeatMetrics(process.env, constants, context);
|
||||
} catch (err) {
|
||||
console.warn("Failed to send periodic heartbeat metrics", err);
|
||||
}
|
||||
|
||||
console.info("Pausing for", POLL_DELAY, "ms");
|
||||
await wait(POLL_DELAY);
|
||||
console.info("Remaining", context.getRemainingTimeInMillis(), "ms");
|
||||
|
@ -46,6 +64,12 @@ module.exports.handler = async function(event, context) {
|
|||
return;
|
||||
}
|
||||
console.info("Execution mutex released");
|
||||
|
||||
try {
|
||||
await sendHeartbeatMetrics(process.env, context);
|
||||
} catch (err) {
|
||||
console.warn("Failed to send final heartbeat metrics", err);
|
||||
}
|
||||
};
|
||||
|
||||
const wait = delay => new Promise(resolve => setTimeout(resolve, delay));
|
||||
|
@ -77,6 +101,47 @@ const releaseExecutionLock = (
|
|||
Key: { key: EXECUTION_MUTEX_KEY }
|
||||
}).promise();
|
||||
|
||||
// Throttle sending heartbeat metrics
|
||||
const maybeSendHeartbeatMetrics = async (env, constants, context) => {
|
||||
if (
|
||||
lastHeartbeat !== false &&
|
||||
Date.now() - lastHeartbeat < constants.MIN_HEARTBEAT_PERIOD
|
||||
) {
|
||||
console.info("Skipping heartbeat metrics ping", Date.now() - lastHeartbeat);
|
||||
return;
|
||||
}
|
||||
await sendHeartbeatMetrics(env, context);
|
||||
};
|
||||
|
||||
const sendHeartbeatMetrics = async (
|
||||
{ QUEUE_NAME },
|
||||
{ awsRequestId: poller_id }
|
||||
) => {
|
||||
console.info("Sending heartbeat metrics");
|
||||
lastHeartbeat = Date.now();
|
||||
const { QueueUrl } = await SQS.getQueueUrl({
|
||||
QueueName: QUEUE_NAME
|
||||
}).promise();
|
||||
const {
|
||||
ApproximateNumberOfMessages: items_in_queue,
|
||||
ApproximateNumberOfMessagesDelayed: items_in_waiting,
|
||||
ApproximateNumberOfMessagesNotVisible: items_in_progress
|
||||
} = await SQS.getQueueAttributes({
|
||||
QueueUrl,
|
||||
AttributeNames: [
|
||||
"ApproximateNumberOfMessages",
|
||||
"ApproximateNumberOfMessagesDelayed",
|
||||
"ApproximateNumberOfMessagesNotVisible"
|
||||
]
|
||||
}).promise();
|
||||
await Metrics.pollerHeartbeat({
|
||||
poller_id,
|
||||
items_in_queue,
|
||||
items_in_progress,
|
||||
items_in_waiting
|
||||
});
|
||||
};
|
||||
|
||||
async function pollQueue(
|
||||
{ QUEUE_NAME, PROCESS_QUEUE_FUNCTION },
|
||||
{ MAX_LONG_POLL_PERIOD, RATE_PERIOD, RATE_LIMIT },
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
const sinon = require("sinon");
|
||||
const { expect } = require("chai");
|
||||
|
||||
const {
|
||||
|
@ -11,12 +12,21 @@ const {
|
|||
constants: { QueueUrl, ReceiptHandle }
|
||||
} = global;
|
||||
|
||||
// NOTE: Import the test subject as late as possible so that the mocks work
|
||||
const awsRequestId = "test-uuid";
|
||||
|
||||
const Metrics = require("../lib/metrics");
|
||||
const processQueueItem = require("./processQueueItem");
|
||||
|
||||
describe("functions/processQueueItem.handler", () => {
|
||||
let metricsStub;
|
||||
|
||||
beforeEach(() => {
|
||||
global.resetMocks();
|
||||
metricsStub = sinon.stub(Metrics, "workerWorks");
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
metricsStub.restore();
|
||||
});
|
||||
|
||||
it("hits negative_uri on negative match from upstream service", async () => {
|
||||
|
@ -65,10 +75,11 @@ describe("functions/processQueueItem.handler", () => {
|
|||
const expectCommonItemProcessed = async positive => {
|
||||
const Body = makeBody();
|
||||
const signedImageUrl = "https://example.s3.amazonaws.com/someimage";
|
||||
process.env.METRICS_URL = "https://example.com";
|
||||
|
||||
mocks.getSignedUrl.returns(signedImageUrl);
|
||||
|
||||
await processQueueItem.handler({ ReceiptHandle, Body });
|
||||
await processQueueItem.handler({ ReceiptHandle, Body }, { awsRequestId });
|
||||
|
||||
expect(mocks.getSignedUrl.lastCall.args).to.deep.equal([
|
||||
"getObject",
|
||||
|
@ -111,6 +122,22 @@ describe("functions/processQueueItem.handler", () => {
|
|||
QueueUrl,
|
||||
ReceiptHandle
|
||||
});
|
||||
|
||||
const response = positive ? positiveMatchResponse : negativeMatchResponse;
|
||||
expect(metricsStub.called).to.be.true;
|
||||
expect(metricsStub.args[0][0]).to.deep.include({
|
||||
consumer_name: defaultMessage.user,
|
||||
worker_id: awsRequestId,
|
||||
watchdog_id: defaultMessage.id,
|
||||
photodna_tracking_id: response.TrackingId,
|
||||
is_error: false,
|
||||
is_match: response.IsMatch
|
||||
});
|
||||
expect(metricsStub.args[0][0]).to.include.keys(
|
||||
"timing_sent",
|
||||
"timing_received",
|
||||
"timing_submitted"
|
||||
);
|
||||
};
|
||||
});
|
||||
|
||||
|
|
|
@ -4,8 +4,12 @@ const AWS = require("aws-sdk");
|
|||
const S3 = new AWS.S3({ apiVersion: "2006-03-01" });
|
||||
const SQS = new AWS.SQS({ apiVersion: "2012-11-05" });
|
||||
const request = require("request-promise-native");
|
||||
const Metrics = require("../lib/metrics");
|
||||
|
||||
module.exports.handler = async function({ ReceiptHandle, Body }) {
|
||||
module.exports.handler = async function(
|
||||
{ ReceiptHandle, Body },
|
||||
{ awsRequestId }
|
||||
) {
|
||||
const {
|
||||
QUEUE_NAME,
|
||||
CONTENT_BUCKET: Bucket,
|
||||
|
@ -13,6 +17,7 @@ module.exports.handler = async function({ ReceiptHandle, Body }) {
|
|||
} = process.env;
|
||||
|
||||
const {
|
||||
datestamp,
|
||||
upstreamServiceUrl,
|
||||
id,
|
||||
user,
|
||||
|
@ -29,6 +34,9 @@ module.exports.handler = async function({ ReceiptHandle, Body }) {
|
|||
Key: image
|
||||
});
|
||||
|
||||
const timingSent = Date.now() - Date.parse(datestamp);
|
||||
|
||||
const timingReceivedStart = Date.now();
|
||||
const upstreamServiceResponse = await request.post({
|
||||
url: `${upstreamServiceUrl}?enhance`,
|
||||
headers: {
|
||||
|
@ -41,6 +49,7 @@ module.exports.handler = async function({ ReceiptHandle, Body }) {
|
|||
Value: imageUrl
|
||||
}
|
||||
});
|
||||
const timingReceived = Date.now() - timingReceivedStart;
|
||||
|
||||
const { IsMatch } = upstreamServiceResponse;
|
||||
if (!IsMatch) {
|
||||
|
@ -68,6 +77,7 @@ module.exports.handler = async function({ ReceiptHandle, Body }) {
|
|||
}).promise();
|
||||
}
|
||||
|
||||
const timingSubmittedStart = Date.now();
|
||||
await request.post({
|
||||
url: IsMatch ? positive_uri : negative_uri,
|
||||
headers: {
|
||||
|
@ -79,12 +89,25 @@ module.exports.handler = async function({ ReceiptHandle, Body }) {
|
|||
positive: upstreamServiceResponse.IsMatch
|
||||
}
|
||||
});
|
||||
const timingSubmitted = Date.now() - timingSubmittedStart;
|
||||
|
||||
const { QueueUrl } = await SQS.getQueueUrl({
|
||||
QueueName: QUEUE_NAME
|
||||
}).promise();
|
||||
|
||||
await SQS.deleteMessage({ QueueUrl, ReceiptHandle }).promise();
|
||||
|
||||
await Metrics.workerWorks({
|
||||
consumer_name: user,
|
||||
worker_id: awsRequestId,
|
||||
watchdog_id: id,
|
||||
photodna_tracking_id: upstreamServiceResponse.TrackingId,
|
||||
is_error: false,
|
||||
is_match: upstreamServiceResponse.IsMatch,
|
||||
timing_sent: timingSent,
|
||||
timing_received: timingReceived,
|
||||
timing_submitted: timingSubmitted
|
||||
});
|
||||
} catch (err) {
|
||||
console.log("REQUEST ERROR", err);
|
||||
}
|
||||
|
|
|
@ -11,5 +11,8 @@ module.exports = {
|
|||
MAX_LONG_POLL_PERIOD: 20,
|
||||
POLL_DELAY: 100,
|
||||
EXECUTION_MUTEX_KEY: "pollQueueExecutionExpires",
|
||||
EXECUTION_MUTEX_TTL: 50 * 1000
|
||||
EXECUTION_MUTEX_TTL: 50 * 1000,
|
||||
MIN_HEARTBEAT_PERIOD: 20 * 1000,
|
||||
TILES_STAGE_URL: "https://onyx_tiles.stage.mozaws.net/v3/links/ping-centre",
|
||||
TILES_PROD_URL: "https://tiles.services.mozilla.com/v3/links/ping-centre"
|
||||
};
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
const { expect } = require("chai");
|
||||
const { mocks } = global;
|
||||
const { TILES_STAGE_URL, TILES_PROD_URL } = require("./constants");
|
||||
const Metrics = require("./metrics");
|
||||
|
||||
describe("lib/metrics", () => {
|
||||
beforeEach(() => {
|
||||
global.resetMocks();
|
||||
});
|
||||
|
||||
describe("Metrics", () => {
|
||||
describe("ping", () => {
|
||||
const subject = Metrics.ping;
|
||||
|
||||
const expectPostURL = async url => {
|
||||
await subject({ foo: true });
|
||||
expect(mocks.requestPost.called).to.be.true;
|
||||
expect(mocks.requestPost.args[0][0].url).to.equal(url);
|
||||
};
|
||||
|
||||
it("uses METRICS_URL env var when available", async () => {
|
||||
process.env.METRICS_URL = "https://example.com";
|
||||
await expectPostURL(process.env.METRICS_URL);
|
||||
delete process.env.METRICS_URL;
|
||||
});
|
||||
|
||||
it("uses staging URL when NODE_ENV===development", async () => {
|
||||
process.env.NODE_ENV = "development";
|
||||
await expectPostURL(TILES_STAGE_URL);
|
||||
});
|
||||
|
||||
it("uses production URL when NODE_ENV===production", async () => {
|
||||
process.env.NODE_ENV = "production";
|
||||
await expectPostURL(TILES_PROD_URL);
|
||||
});
|
||||
});
|
||||
|
||||
const expectPostBody = async (subject, event, params) => {
|
||||
await subject(Object.assign({ ignored: "extra" }, params));
|
||||
const body = mocks.requestPost.args[0][0].body;
|
||||
expect(body).to.include.key("timestamp");
|
||||
delete body.timestamp;
|
||||
expect(body).to.deep.equal(
|
||||
Object.assign(
|
||||
{
|
||||
topic: "watchdog-proxy",
|
||||
event
|
||||
},
|
||||
params
|
||||
)
|
||||
);
|
||||
};
|
||||
|
||||
describe("newItem", () => {
|
||||
const subject = Metrics.newItem;
|
||||
it("sends expected properties", async () => {
|
||||
await expectPostBody(subject, "new_item", {
|
||||
consumer_name: "foo",
|
||||
watchdog_id: "bar",
|
||||
type: "baz"
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("pollerHeartbeat", () => {
|
||||
const subject = Metrics.pollerHeartbeat;
|
||||
it("sends expected properties", async () => {
|
||||
await expectPostBody(subject, "poller_heartbeat", {
|
||||
poller_id: "123",
|
||||
items_in_queue: "456",
|
||||
items_in_progress: "789",
|
||||
items_in_waiting: "012"
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("workerWorks", () => {
|
||||
const subject = Metrics.workerWorks;
|
||||
it("sends expected properties", async () => {
|
||||
await expectPostBody(subject, "worker_works", {
|
||||
consumer_name: "qwe",
|
||||
worker_id: "ytr",
|
||||
watchdog_id: "rty",
|
||||
photodna_tracking_id: "uio",
|
||||
is_match: "asd",
|
||||
is_error: "fgh",
|
||||
timing_retrieved: "jkl",
|
||||
timing_sent: "zxc",
|
||||
timing_received: "vbn",
|
||||
timing_submitted: "mnb"
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,80 @@
|
|||
"use strict";
|
||||
|
||||
const request = require("request-promise-native");
|
||||
|
||||
const { TILES_STAGE_URL, TILES_PROD_URL } = require("./constants");
|
||||
|
||||
const Metrics = (module.exports = {
|
||||
ping: async (data = {}) => {
|
||||
// Accept a METRICS_URL env var override or select URL based on NODE_ENV
|
||||
let url;
|
||||
if (process.env.METRICS_URL) {
|
||||
url = process.env.METRICS_URL;
|
||||
} else {
|
||||
url =
|
||||
process.env.NODE_ENV === "production"
|
||||
? TILES_PROD_URL
|
||||
: TILES_STAGE_URL;
|
||||
}
|
||||
return request.post({
|
||||
url,
|
||||
headers: { "Content-Type": "application/json" },
|
||||
json: true,
|
||||
body: Object.assign(
|
||||
{
|
||||
topic: "watchdog-proxy",
|
||||
timestamp: new Date().toISOString()
|
||||
},
|
||||
data
|
||||
)
|
||||
});
|
||||
},
|
||||
|
||||
newItem: ({ consumer_name, watchdog_id, type }) =>
|
||||
Metrics.ping({
|
||||
event: "new_item",
|
||||
consumer_name,
|
||||
watchdog_id,
|
||||
type
|
||||
}),
|
||||
|
||||
pollerHeartbeat: ({
|
||||
poller_id,
|
||||
items_in_queue,
|
||||
items_in_progress,
|
||||
items_in_waiting
|
||||
}) =>
|
||||
Metrics.ping({
|
||||
event: "poller_heartbeat",
|
||||
poller_id,
|
||||
items_in_queue,
|
||||
items_in_progress,
|
||||
items_in_waiting
|
||||
}),
|
||||
|
||||
workerWorks: ({
|
||||
consumer_name,
|
||||
worker_id,
|
||||
watchdog_id,
|
||||
photodna_tracking_id,
|
||||
is_match,
|
||||
is_error,
|
||||
timing_retrieved,
|
||||
timing_sent,
|
||||
timing_received,
|
||||
timing_submitted
|
||||
}) =>
|
||||
Metrics.ping({
|
||||
event: "worker_works",
|
||||
consumer_name,
|
||||
worker_id,
|
||||
watchdog_id,
|
||||
photodna_tracking_id,
|
||||
is_match,
|
||||
is_error,
|
||||
timing_retrieved,
|
||||
timing_sent,
|
||||
timing_received,
|
||||
timing_submitted
|
||||
})
|
||||
});
|
|
@ -16,6 +16,11 @@ global.env = {
|
|||
global.constants = {
|
||||
ETag: '"ae1e7accaab42504a930ecc6e6aa34c2"',
|
||||
QueueUrl: "https://example.com/sqs/",
|
||||
QueueAttributes: {
|
||||
ApproximateNumberOfMessages: 200,
|
||||
ApproximateNumberOfMessagesDelayed: 20,
|
||||
ApproximateNumberOfMessagesNotVisible: 2
|
||||
},
|
||||
MessageId: "abba123",
|
||||
requestId: "8675309",
|
||||
ReceiptHandle: "5551212",
|
||||
|
@ -32,21 +37,10 @@ global.constants = {
|
|||
}
|
||||
};
|
||||
|
||||
const defaultConstantsModule = {
|
||||
DEFAULT_HAWK_ALGORITHM: "sha256",
|
||||
DEV_CREDENTIALS: {
|
||||
devuser: {
|
||||
key: "devkey",
|
||||
algorithm: "sha256"
|
||||
}
|
||||
},
|
||||
RATE_LIMIT: 5,
|
||||
const defaultConstantsModule = Object.assign({}, require("./constants"), {
|
||||
RATE_PERIOD: 500,
|
||||
MAX_LONG_POLL_PERIOD: 20,
|
||||
POLL_DELAY: 100,
|
||||
EXECUTION_MUTEX_KEY: "pollQueueExecutionExpires",
|
||||
EXECUTION_MUTEX_TTL: 50 * 1000
|
||||
};
|
||||
MIN_HEARTBEAT_PERIOD: 0
|
||||
});
|
||||
global.constantsModule = Object.assign({}, defaultConstantsModule);
|
||||
mockRequire("./constants", global.constantsModule);
|
||||
|
||||
|
@ -55,6 +49,7 @@ global.mocks = {
|
|||
getItem: (AWS.DynamoDB.DocumentClient.prototype.get = sinon.stub()),
|
||||
putItem: (AWS.DynamoDB.DocumentClient.prototype.put = sinon.stub()),
|
||||
deleteItem: (AWS.DynamoDB.DocumentClient.prototype.delete = sinon.stub()),
|
||||
getQueueAttributes: (AWS.SQS.prototype.getQueueAttributes = sinon.stub()),
|
||||
getQueueUrl: (AWS.SQS.prototype.getQueueUrl = sinon.stub()),
|
||||
getSignedUrl: (AWS.S3.prototype.getSignedUrl = sinon.stub()),
|
||||
putObject: (AWS.S3.prototype.putObject = sinon.stub()),
|
||||
|
@ -71,7 +66,7 @@ global.resetMocks = () => {
|
|||
const {
|
||||
mocks,
|
||||
makePromiseFn,
|
||||
constants: { QueueUrl, MessageId, ETag }
|
||||
constants: { QueueUrl, QueueAttributes, MessageId, ETag }
|
||||
} = global;
|
||||
|
||||
Object.assign(global.constantsModule, defaultConstantsModule);
|
||||
|
@ -84,6 +79,7 @@ global.resetMocks = () => {
|
|||
mocks.deleteItem.returns(makePromiseFn({}));
|
||||
mocks.putItem.returns(makePromiseFn({}));
|
||||
mocks.getItem.returns(makePromiseFn({}));
|
||||
mocks.getQueueAttributes.returns(makePromiseFn({ QueueAttributes }));
|
||||
mocks.getQueueUrl.returns(makePromiseFn({ QueueUrl }));
|
||||
mocks.getSignedUrl.returns("");
|
||||
mocks.putObject.returns(makePromiseFn({ ETag }));
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
"precommit": "lint-staged && npm run test:js",
|
||||
"deploy": "cross-env NODE_ENV=production serverless deploy",
|
||||
"deploy:dev": "cross-env NODE_ENV=development ENABLE_DEV_AUTH=1 serverless deploy",
|
||||
"deploy:master": "cross-env STAGE=dev DOMAIN=dev NODE_ENV=development ENABLE_DEV_AUTH=1 UPSTREAM_SERVICE_URL=__MOCK__ UPSTREAM_SERVICE_KEY=__MOCK__ serverless deploy",
|
||||
"deploy:master": "cross-env STAGE=dev DOMAIN=dev NODE_ENV=development ENABLE_DEV_AUTH=1 UPSTREAM_SERVICE_URL=__MOCK__ UPSTREAM_SERVICE_KEY=__MOCK__ METRICS_URL=https://watchdog-proxy.dev.mozaws.net/mock/log serverless deploy",
|
||||
"info": "serverless info",
|
||||
"lint": "npm-run-all lint:*",
|
||||
"lint:js": "eslint functions lib test",
|
||||
|
|
|
@ -254,3 +254,11 @@ functions:
|
|||
- http:
|
||||
path: mock/client/positive
|
||||
method: post
|
||||
|
||||
mockLogPost:
|
||||
handler: functions/mockEndpoints.logPost
|
||||
name: ${self:custom.prefix}-mockLogPost
|
||||
events:
|
||||
- http:
|
||||
path: mock/log
|
||||
method: post
|
||||
|
|
Загрузка…
Ссылка в новой задаче