Initial queue poller experiment

This commit is contained in:
Les Orchard 2018-05-01 13:49:43 -04:00
Родитель 3be0049be4
Коммит e2717c4c08
9 изменённых файлов: 8716 добавлений и 0 удалений

9
.editorconfig Normal file
Просмотреть файл

@ -0,0 +1,9 @@
# http://editorconfig.org
root = true
[*]
indent_style = space
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
indent_size = 2

5
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,5 @@
.*.sw?
*.log
.serverless
node_modules
npm-debug.log

71
README.md Normal file
Просмотреть файл

@ -0,0 +1,71 @@
# watchdog-proxy
## Development
### Quickstart Notes
Development is currently done directly on Amazon Web Services. So, you'll need to [sign up for an account](https://aws.amazon.com/) or [request a Dev IAM account from Mozilla Cloud Operations](https://mana.mozilla.org/wiki/display/SVCOPS/Requesting+A+Dev+IAM+account+from+Cloud+Operations). (The latter is available only to Mozillians.)
Optional: [Install AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/installing.html). This gives you tools to work with AWS from the command line.
Ensure [node.js 8.11.1](https://nodejs.org/en/) or newer is installed.
Clone the project repository - e.g. `git clone git@github.com:mozilla/watchdog-proxy.git`
Install all the dependencies for the project: `cd watchdog-proxy && npm install`
If you already have an AWS key ID and secret, [you can follow the quick start docs for Serverless to configure your credentials](https://serverless.com/framework/docs/providers/aws/guide/credentials#quick-setup)
If you don't already have an AWS key ID and secret, [follow the guide to acquire and configure these credentials](https://serverless.com/framework/docs/providers/aws/guide/credentials/).
Choose a unique stage name to use for development - e.g. mine is `lmorchard`. This is used in naming all the pieces of the stack you deploy, in order to distinguish them from any other stack.
Try deploying the service to AWS: `npm run deploy -- --stage <stage name>`
You should see output like the following:
```
$ npm run deploy -- --stage lmorchard
Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Creating Stack...
Serverless: Checking Stack create progress...
.....
Serverless: Stack create finished...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service .zip file to S3 (6.39 MB)...
Serverless: Validating template...
Serverless: Updating Stack...
Serverless: Checking Stack update progress...
...........................................................................
Serverless: Stack update finished...
Service Information
service: watchdog-proxy
stage: lmorchard
region: us-east-1
stack: watchdog-proxy-lmorchard
api keys:
None
endpoints:
GET - https://30r00qsyhf.execute-api.us-east-1.amazonaws.com/lmorchard/accept
functions:
accept: watchdog-proxy-lmorchard-accept
pollQueue: watchdog-proxy-lmorchard-pollQueue
processQueueItem: watchdog-proxy-lmorchard-processQueueItem
```
If everything was successful, you should now have a running stack with an HTTPS resource to accept requests listed as one of the endpoints.
To remove this stack from AWS and delete everything, run `./node_modules/.bin/serverless remove --stage <stage name>`
The [Serverless docs on workflow are useful](https://serverless.com/framework/docs/providers/aws/guide/workflow/).
These are also a few useful example commands:
```
# Tail logs from lambda functions
npm run logs -- --stage lmorchard -f accept -t
# Deploy an individual function on file changes (double-double-dashes because
shells are weird)
npm run watch -- -- --stage lmorchard -f accept
```

44
functions/accept.js Normal file
Просмотреть файл

@ -0,0 +1,44 @@
"use strict";
const AWS = require("aws-sdk");
const S3 = new AWS.S3({ apiVersion: "2006-03-01" });
const SQS = new AWS.SQS({ apiVersion: "2012-11-05" });
const { QUEUE_NAME, CONTENT_BUCKET } = process.env;
module.exports.handler = async function(
{ requestContext: { requestId } },
context
) {
console.time("accept");
const responseCode = 200;
const responseBody = { requestId };
console.time("acceptS3");
const result = await S3.putObject({
Bucket: CONTENT_BUCKET,
Key: requestId,
Body: "THIS WILL BE AN IMAGE SOMEDAY"
}).promise();
responseBody.s3Result = result;
console.timeEnd("acceptS3");
console.time("acceptSQS");
const { QueueUrl } = await SQS.getQueueUrl({
QueueName: QUEUE_NAME
}).promise();
const { MessageId } = await SQS.sendMessage({
MessageBody: JSON.stringify({
nowish: Date.now(),
requestId
}),
QueueUrl
}).promise();
responseBody.sqsResult = "SUCCESS " + MessageId;
console.timeEnd("acceptSQS");
console.timeEnd("accept");
return {
statusCode: responseCode,
body: JSON.stringify(responseBody)
};
};

137
functions/pollQueue.js Normal file
Просмотреть файл

@ -0,0 +1,137 @@
"use strict";
const AWS = require("aws-sdk");
const DBD = new AWS.DynamoDB.DocumentClient();
const SQS = new AWS.SQS({ apiVersion: "2012-11-05" });
const Lambda = new AWS.Lambda({ apiVersion: "2015-03-31" });
const { CONFIG_TABLE, QUEUE_NAME, PROCESS_QUEUE_FUNCTION } = process.env;
const RATE_LIMIT = 5;
const RATE_PERIOD = 1000;
const MAX_LONG_POLL_PERIOD = 20;
const POLL_DELAY = 100;
const EXECUTION_MUTEX_KEY = "pollQueueExecutionExpires";
const EXECUTION_MUTEX_TTL = 50 * 1000;
const wait = delay => new Promise(resolve => setTimeout(resolve, delay));
// Running list of timestamps for hits on rate limit
let rateHits = [];
module.exports.handler = async function(event, context) {
const now = Date.now();
try {
await DBD.put({
TableName: CONFIG_TABLE,
Item: {
key: EXECUTION_MUTEX_KEY,
value: now + EXECUTION_MUTEX_TTL
},
ConditionExpression: "#key <> :key OR (#key = :key AND #value < :value)",
ExpressionAttributeNames: { "#key": "key", "#value": "value" },
ExpressionAttributeValues: {
":key": EXECUTION_MUTEX_KEY,
":value": Date.now()
}
}).promise();
} catch (err) {
console.warn("Could not acquire execution mutex", err);
return;
}
console.info("Execution mutex acquired");
let polls = 0;
console.log("Poller start");
do {
try {
const tname = `pollQueue ${++polls}`;
console.time(tname);
await pollQueue(context);
console.timeEnd(tname);
} catch (err) {
console.error("Error in pollQueue", err);
return;
}
await wait(POLL_DELAY);
console.log("Remaining", context.getRemainingTimeInMillis(), "ms");
} while (Math.floor(context.getRemainingTimeInMillis() / 1000) > 1);
console.log("Poller exit");
try {
await DBD.delete({
TableName: CONFIG_TABLE,
Key: { key: EXECUTION_MUTEX_KEY }
}).promise();
} catch (err) {
console.warn("Could not release execution mutex", err);
return;
}
console.info("Execution mutex released");
};
async function pollQueue(context) {
// Calculate seconds remaining for poller execution, using maximum for
// long poll or whatever time we have left
const WaitTimeSeconds = Math.min(
MAX_LONG_POLL_PERIOD,
Math.floor(context.getRemainingTimeInMillis() / 1000) - 1
);
if (WaitTimeSeconds <= 0) {
console.log("Out of time");
return;
}
// Slide the rate limit window and calculate available hits
const rateWindowStart = Date.now() - RATE_PERIOD;
rateHits = rateHits.filter(item => item > rateWindowStart);
const MaxNumberOfMessages = RATE_LIMIT - rateHits.length;
if (MaxNumberOfMessages <= 0) {
console.log("Yielding to limit rate");
return;
}
// Long-poll for SQS messages up to rate limit or execution timeout
console.time("SQS");
const { QueueUrl } = await SQS.getQueueUrl({
QueueName: QUEUE_NAME
}).promise();
const receiveResult = await SQS.receiveMessage({
QueueUrl,
WaitTimeSeconds,
MaxNumberOfMessages,
MessageAttributeNames: ["All"]
}).promise();
console.timeEnd("SQS");
// Process the messages received from queue
const messages = receiveResult.Messages || [];
if (messages.length > 0) {
// Invoke the workers in parallel, since we're only ever going
// to invoke up to the rate limit
console.time("Worker batch");
await Promise.all(
messages.map(async message => {
const messageBody = JSON.parse(message.Body);
const mtname = `Message ${messageBody.requestId}`;
console.time(mtname);
// Record a hit for rate limit
rateHits.push(Date.now());
// Invoke the process function for queue item
await Lambda.invoke({
FunctionName: PROCESS_QUEUE_FUNCTION,
InvocationType: "Event",
LogType: "None",
Payload: JSON.stringify(message)
}).promise();
console.timeEnd(mtname);
})
);
console.timeEnd("Worker batch");
}
}

Просмотреть файл

@ -0,0 +1,37 @@
"use strict";
const AWS = require("aws-sdk");
const S3 = new AWS.S3({ apiVersion: "2006-03-01" });
const SQS = new AWS.SQS({ apiVersion: "2012-11-05" });
const request = require("request-promise-native");
const { QUEUE_NAME, CONTENT_BUCKET } = process.env;
module.exports.handler = async function({ ReceiptHandle, Body }, context) {
const { requestId } = JSON.parse(Body);
console.log("MESSAGE BODY", requestId);
try {
const getResult = await S3.getObject({
Bucket: CONTENT_BUCKET,
Key: requestId
}).promise();
console.log("GET", getResult);
await S3.deleteObject({
Bucket: CONTENT_BUCKET,
Key: requestId
}).promise();
await request(
`https://webhook.site/c0a8dd46-1405-4172-a99a-0646663f3dc2?requestId=${requestId}`
);
} catch (err) {
console.log("REQUEST ERROR", err);
}
const { QueueUrl } = await SQS.getQueueUrl({
QueueName: QUEUE_NAME
}).promise();
await SQS.deleteMessage({ QueueUrl, ReceiptHandle }).promise();
};

8236
package-lock.json сгенерированный Normal file

Разница между файлами не показана из-за своего большого размера Загрузить разницу

29
package.json Normal file
Просмотреть файл

@ -0,0 +1,29 @@
{
"name": "watchdog-proxy",
"version": "0.0.1",
"description": "Rate limiting proxy for watchdog requests",
"scripts": {
"deploy": "serverless deploy",
"remove": "serverless remove",
"logs": "serverless logs",
"watch": "onchange -v \"functions/*.js\" \"lib/*.js\" -- npm run deploy"
},
"homepage": "https://github.com/mozilla/watchdog-proxy/",
"repository": {
"type": "git",
"url": "https://github.com/mozilla/watchdog-proxy.git"
},
"author": "Les Orchard <me@lmorchard.com>",
"license": "MPL-2.0",
"devDependencies": {
"aws-sdk": "^2.224.1",
"onchange": "^3.3.0",
"serverless": "^1.8.0",
"serverless-s3-remover": "^0.4.1",
"serverless-sqs-alarms-plugin": "^0.0.2"
},
"dependencies": {
"request": "^2.85.0",
"request-promise-native": "^1.0.5"
}
}

148
serverless.yml Normal file
Просмотреть файл

@ -0,0 +1,148 @@
service: watchdog-proxy
plugins:
- serverless-s3-remover
custom:
region: ${self:provider.region}
stage: ${opt:stage, self:provider.stage}
prefix: ${self:service}-${self:custom.stage}
process: ${self:custom.prefix}-processQueueItem
config: ${self:custom.prefix}-config
sns: ${self:custom.prefix}-trigger
sqs: ${self:custom.prefix}-messages
contentBucket: ${self:custom.prefix}-content
remover:
buckets:
- ${self:custom.contentBucket}
provider:
name: aws
runtime: nodejs8.10
stage: dev
region: us-east-1
memorySize: 128
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:DeleteItem
- dynamodb:UpdateItem
- dynamodb:Query
- dynamodb:Scan
Resource:
- arn:aws:dynamodb:*:*:table/${self:custom.config}
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: arn:aws:lambda:*:*:function:${self:custom.process}
- Effect: Allow
Action:
- sqs:ChangeMessageVisibility
- sqs:ChangeMessageVisibilityBatch
- sqs:DeleteMessage
- sqs:DeleteMessageBatch
- sqs:GetQueueAttributes
- sqs:GetQueueUrl
- sqs:ReceiveMessage
- sqs:SendMessage
- sqs:SendMessageBatch
Resource: arn:aws:sqs:*:*:${self:custom.sqs}
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
- s3:PutObjectAcl
- s3:DeleteObject
- s3:ListBucket
Resource:
- arn:aws:s3:::${self:custom.contentBucket}
- arn:aws:s3:::${self:custom.contentBucket}/*
resources:
Resources:
S3BucketContent:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:custom.contentBucket}
LifecycleConfiguration:
Rules:
- Id: DailyCleanup
Status: Enabled
ExpirationInDays: 1
Messages:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.sqs}
MessageRetentionPeriod: 1209600
VisibilityTimeout: 60
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- MessagesDeadLetterQueue
- Arn
maxReceiveCount: 10
MessagesDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.sqs}-dead-letter-queue
MessageRetentionPeriod: 1209600
Config:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:custom.config}
AttributeDefinitions:
- AttributeName: key
AttributeType: S
KeySchema:
- AttributeName: key
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
package:
exclude:
- docs/**
- helpers/**
- test/**
functions:
accept:
handler: functions/accept.handler
name: ${self:custom.prefix}-accept
environment:
CONFIG_TABLE: ${self:custom.config}
QUEUE_NAME: ${self:custom.sqs}
CONTENT_BUCKET: ${self:custom.contentBucket}
events:
- http:
path: accept
method: get
pollQueue:
timeout: 60
handler: functions/pollQueue.handler
name: ${self:custom.prefix}-pollQueue
environment:
CONFIG_TABLE: ${self:custom.config}
QUEUE_NAME: ${self:custom.sqs}
CONTENT_BUCKET: ${self:custom.contentBucket}
PROCESS_QUEUE_FUNCTION: ${self:custom.process}
events:
- schedule: rate(1 minute)
processQueueItem:
timeout: 60
handler: functions/processQueueItem.handler
name: ${self:custom.process}
environment:
CONFIG_TABLE: ${self:custom.config}
QUEUE_NAME: ${self:custom.sqs}
CONTENT_BUCKET: ${self:custom.contentBucket}