Merge pull request #15 from jonasfj/analysis-debug-service
analysis debug service as is now deployed
This commit is contained in:
Коммит
f0b5962dae
|
@ -0,0 +1,320 @@
|
|||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
|
||||
Description: >
|
||||
CloudFormation template for telemetry-analysis worker setup, this stack
|
||||
outputs a queue to which analysis tasks can be submitted. Pool of spot workers
|
||||
will automatically scale to do the work.
|
||||
|
||||
################################################################################
|
||||
|
||||
Parameters:
|
||||
|
||||
instanceType:
|
||||
Description: Worker instance type
|
||||
Type: String
|
||||
Default: m1.xlarge
|
||||
AllowedValues:
|
||||
- m1.small
|
||||
- m1.medium
|
||||
- m1.large
|
||||
- m1.xlarge
|
||||
- m2.xlarge
|
||||
- m2.2xlarge
|
||||
- m2.4xlarge
|
||||
- c1.medium
|
||||
- c1.xlarge
|
||||
ConstraintDescription: must be a valid EC2 instance type.
|
||||
|
||||
#REMARK: We should consider removing keyName as it shouldn't be possible to
|
||||
# ssh in to the worker instances
|
||||
keyName:
|
||||
Description: Name of key pair for SSH
|
||||
Type: String
|
||||
Default: jonasfj
|
||||
|
||||
spotPrice:
|
||||
Description: Spot price for workers
|
||||
Type: String
|
||||
Default: 0.1
|
||||
|
||||
sourcesBucket:
|
||||
Description: Bucket containing sources and templates
|
||||
Type: String
|
||||
Default: telemetry-code
|
||||
|
||||
sourcesVersion:
|
||||
Description: Version of sources to load from sources bucket
|
||||
Type: String
|
||||
Default: '1'
|
||||
|
||||
################################################################################
|
||||
|
||||
Mappings:
|
||||
|
||||
# A map from region to 64 bit Ubuntu 13.04 backed by instance storage
|
||||
# We use instance storage to avoid stability issues with EBS also we don't
|
||||
# have to pay for IO. We'll initialize these with CloudInit, later.
|
||||
# See: http://cloud-images.ubuntu.com/locator/ec2/
|
||||
regionToAMI:
|
||||
ap-northeast-1: {AMI: ami-7f41da7e}
|
||||
ap-southeast-1: {AMI: ami-3af8b268}
|
||||
ap-southeast-2: {AMI: ami-a5960a9f}
|
||||
eu-west-1: {AMI: ami-2adc3c5d}
|
||||
sa-east-1: {AMI: ami-f1dd7bec}
|
||||
us-east-1: {AMI: ami-3d257954}
|
||||
us-west-1: {AMI: ami-20e5d265}
|
||||
us-west-2: {AMI: ami-2460f914}
|
||||
|
||||
|
||||
################################################################################
|
||||
|
||||
Resources:
|
||||
|
||||
# Input queue to which analysis tasks should be posted
|
||||
telemetryAnalysisInput:
|
||||
Type: AWS::SQS::Queue
|
||||
Properties:
|
||||
DelaySeconds: 15
|
||||
MessageRetentionPeriod: 345600
|
||||
ReceiveMessageWaitTimeSeconds: 20
|
||||
VisibilityTimeout: 1800
|
||||
|
||||
# Security group for workers, this should only allow for access to SQS and S3
|
||||
workerSecurityGroup:
|
||||
Type: AWS::EC2::SecurityGroup
|
||||
Properties:
|
||||
GroupDescription: telemetry-analysis worker security group
|
||||
SecurityGroupIngress:
|
||||
- {IpProtocol: tcp, FromPort: 22, ToPort: 22, CidrIp: 0.0.0.0/0}
|
||||
|
||||
# IAM role for analysis workers
|
||||
workerIAMRole:
|
||||
Type: AWS::IAM::Role
|
||||
Properties:
|
||||
AssumeRolePolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Principal:
|
||||
Service:
|
||||
- ec2.amazonaws.com
|
||||
Action:
|
||||
- 'sts:AssumeRole'
|
||||
Path: /telemetry/analysis/
|
||||
Policies:
|
||||
# Grant access to analysis-worker tarball in sourcesBucket
|
||||
- PolicyName: workerSourcesBucketAccessPolicy
|
||||
PolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- 's3:GetObject'
|
||||
Resource: {"Fn::Join": ["", [
|
||||
'arn:aws:s3:::',
|
||||
{Ref: sourcesBucket},
|
||||
/v,
|
||||
{Ref: sourcesVersion},
|
||||
/analysis-worker.tar.gz
|
||||
]]}
|
||||
|
||||
# IAM instance profile granting workerIAMRole to workers
|
||||
workerInstanceProfile:
|
||||
Type: AWS::IAM::InstanceProfile
|
||||
Properties:
|
||||
Path: /telemetry/analysis/
|
||||
Roles:
|
||||
- workerIAMRole
|
||||
|
||||
# Worker launch configuration, decides how a worker is launched.
|
||||
workerLaunchConfig:
|
||||
Type: AWS::AutoScaling::LaunchConfiguration
|
||||
DependsOn:
|
||||
- sourcesBucketCfnPolicy
|
||||
Metadata:
|
||||
Comment: telemetry-analysis worker
|
||||
AWS::CloudFormation::Init:
|
||||
config:
|
||||
sources:
|
||||
/home/ubuntu: {'Fn::Join': ["",
|
||||
[
|
||||
"http://",
|
||||
{Ref: sourcesBucket},
|
||||
.s3.amazonaws.com/v,
|
||||
{Ref: sourcesVersion},
|
||||
/analysis-worker.tar.gz
|
||||
]
|
||||
]}
|
||||
commands:
|
||||
run:
|
||||
command: {'Fn::Join': [" ", [
|
||||
"python",
|
||||
"manager.py",
|
||||
"-k",
|
||||
{Ref: workerKeys},
|
||||
"-s",
|
||||
{'Fn::GetAtt': [workerKeys, SecretAccessKey]},
|
||||
"-w",
|
||||
".........",
|
||||
"-q",
|
||||
{'Fn::GetAtt': [telemetryAnalysisInput, QueueName]}
|
||||
]]}
|
||||
cwd: '/home/ubuntu'
|
||||
ignoreErrors: false
|
||||
Properties:
|
||||
KeyName: {Ref: keyName}
|
||||
SpotPrice: {Ref: spotPrice}
|
||||
ImageId: {'Fn::FindInMap': [
|
||||
'regionToAMI',
|
||||
{Ref: 'AWS::Region'},
|
||||
'AMI'
|
||||
]}
|
||||
InstanceType: {Ref: instanceType}
|
||||
IamInstanceProfile: {Ref: workerInstanceProfile}
|
||||
UserData: {'Fn::Base64': {'Fn::Join': ['', [
|
||||
"#!/bin/bash\n",
|
||||
"sudo apt-get -y install python-setuptools\n",
|
||||
"sudo easy_install https://s3.amazonaws.com/cloudformation-examples/aws-cfn-bootstrap-latest.tar.gz",
|
||||
"cfn-init ",
|
||||
" --stack ", {Ref: 'AWS::StackName'},
|
||||
" --resource workerLaunchConfig ",
|
||||
" --access-key ", {Ref: "cfnKeys"},
|
||||
" --secret-key ", {'Fn::GetAtt': [cfnKeys, SecretAccessKey]},
|
||||
" --region ", {Ref: 'AWS::Region'}, "\n"
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
||||
UserData: {'Fn::Base64': {'Fn::Join': ['', [
|
||||
"#!/bin/bash -x\n",
|
||||
"sudo apt-get -y python-yaml python-boto xz-utils s3cmd",
|
||||
"cfn-init ",
|
||||
" --stack ", {Ref: 'AWS::StackName'},
|
||||
" --resource workerLaunchConfig ",
|
||||
" --access-key ", {Ref: "cfnKeys"},
|
||||
" --secret-key ", {'Fn::GetAtt': [cfnKeys, SecretAccessKey]},
|
||||
" --region ", {Ref: 'AWS::Region'}, "\n",
|
||||
"cfn-signal -e $? '", {Ref: "MyWaitHandle"}, "'\n"
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
# Auto scaling group for workers, this entity is modified by worker scaling
|
||||
# polices, the scaling policies are activated by CloudWatch alarms.
|
||||
workerAutoScaleGroup:
|
||||
Type: AWS::AutoScaling::AutoScalingGroup
|
||||
Properties:
|
||||
AvailabilityZones: {'Fn::GetAZs': ''}
|
||||
LaunchConfigurationName: {Ref: workerLaunchConfig}
|
||||
MinSize: 0
|
||||
MaxSize: 20
|
||||
|
||||
# Scaling policies, these are sort of actions that can be executed by a
|
||||
# CloudWatch alarm
|
||||
workerScaleUpPolicy:
|
||||
Type: AWS::AutoScaling::ScalingPolicy
|
||||
Properties:
|
||||
AdjustmentType: ChangeInCapacity
|
||||
AutoScalingGroupName: {Ref: workerAutoScaleGroup}
|
||||
Cooldown: 60
|
||||
ScalingAdjustment: 1
|
||||
|
||||
workerQuickStartPolicy:
|
||||
Type: AWS::AutoScaling::ScalingPolicy
|
||||
Properties:
|
||||
AdjustmentType: ChangeInCapacity
|
||||
AutoScalingGroupName: {Ref: workerAutoScaleGroup}
|
||||
Cooldown: 60
|
||||
ScalingAdjustment: 10
|
||||
|
||||
workerScaleDownPolicy:
|
||||
Type: AWS::AutoScaling::ScalingPolicy
|
||||
Properties:
|
||||
AdjustmentType: ChangeInCapacity
|
||||
AutoScalingGroupName: {Ref: workerAutoScaleGroup}
|
||||
Cooldown: 60
|
||||
ScalingAdjustment: -1
|
||||
|
||||
workerQuickKillPolicy:
|
||||
Type: AWS::AutoScaling::ScalingPolicy
|
||||
Properties:
|
||||
AdjustmentType: ChangeInCapacity
|
||||
AutoScalingGroupName: {Ref: workerAutoScaleGroup}
|
||||
Cooldown: 60
|
||||
ScalingAdjustment: -10
|
||||
|
||||
# CloudWatch alarms, these execute scaling policies
|
||||
manyMessagesAddedAlarm:
|
||||
Type: AWS::CloudWatch::Alarm
|
||||
Properties:
|
||||
AlarmDescription: Scale up if 100+ message are added at once
|
||||
Namespace: AWS/SQS
|
||||
MetricName: NumberOfMessagesSent
|
||||
Dimensions:
|
||||
- Name: QueueName
|
||||
Value: {'Fn::GetAtt': [telemetryAnalysisInput, QueueName]}
|
||||
Statistic: Sum
|
||||
Period: 900
|
||||
EvaluationPeriods: 1
|
||||
Threshold: 100
|
||||
ComparisonOperator: GreaterThanThreshold
|
||||
AlarmActions:
|
||||
- {Ref: workerQuickStartPolicy}
|
||||
|
||||
tooManyMessagesAlarm:
|
||||
Type: AWS::CloudWatch::Alarm
|
||||
Properties:
|
||||
AlarmDescription: Scale up if more than 10 messages
|
||||
Namespace: AWS/SQS
|
||||
MetricName: ApproximateNumberOfMessagesVisible
|
||||
Dimensions:
|
||||
- Name: QueueName
|
||||
Value: {'Fn::GetAtt': [telemetryAnalysisInput, QueueName]}
|
||||
Statistic: Sum
|
||||
Period: 60
|
||||
EvaluationPeriods: 10
|
||||
Threshold: 1
|
||||
ComparisonOperator: GreaterThanThreshold
|
||||
AlarmActions:
|
||||
- {Ref: workerScaleUpPolicy}
|
||||
|
||||
notEnoughMessagesAlarm:
|
||||
Type: AWS::CloudWatch::Alarm
|
||||
Properties:
|
||||
AlarmDescription: Scale down, if not enought messages
|
||||
Namespace: AWS/SQS
|
||||
MetricName: NumberOfEmptyReceives
|
||||
Dimensions:
|
||||
- Name: QueueName
|
||||
Value: {'Fn::GetAtt': [telemetryAnalysisInput, QueueName]}
|
||||
Statistic: Sum
|
||||
Period: 60
|
||||
EvaluationPeriods: 10
|
||||
Threshold: 1
|
||||
ComparisonOperator: GreaterThanThreshold
|
||||
AlarmActions:
|
||||
- {Ref: workerScaleDownPolicy}
|
||||
|
||||
totalStarvationAlarm:
|
||||
Type: AWS::CloudWatch::Alarm
|
||||
Properties:
|
||||
AlarmDescription: Kill lot of workers if they poll to much
|
||||
Namespace: AWS/SQS
|
||||
MetricName: NumberOfEmptyReceives
|
||||
Dimensions:
|
||||
- Name: QueueName
|
||||
Value: {'Fn::GetAtt': [telemetryAnalysisInput, QueueName]}
|
||||
Statistic: Sum
|
||||
Period: 900
|
||||
EvaluationPeriods: 1
|
||||
Threshold: 5000
|
||||
ComparisonOperator: GreaterThanThreshold
|
||||
AlarmActions:
|
||||
- {Ref: workerQuickKillPolicy}
|
||||
|
||||
################################################################################
|
||||
|
||||
Outputs:
|
||||
|
||||
inputQueueMame:
|
||||
Description: SQS Queue name for input
|
||||
Value: {'Fn::GetAtt': [telemetryAnalysisInput, QueueName]}
|
|
@ -0,0 +1,104 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import json, yaml, sys
|
||||
from boto.cloudformation.connection import CloudFormationConnection
|
||||
from boto.exception import BotoServerError
|
||||
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
||||
from xml.etree import ElementTree
|
||||
|
||||
|
||||
def load_template(input_file):
|
||||
# Read YAML, dump as JSON
|
||||
with open(input_file, 'r') as input:
|
||||
try:
|
||||
return json.dumps(yaml.load(input), indent = 2), True
|
||||
except yaml.scanner.ScannerError as e:
|
||||
print >> sys.stderr, "YAML Parsing Failed:"
|
||||
print >> sys.stderr, e
|
||||
return None, False
|
||||
except yaml.parser.ParserError as e:
|
||||
print >> sys.stderr, "YAML Parsing Failed:"
|
||||
print >> sys.stderr, e
|
||||
return None, False
|
||||
|
||||
def validate_template(template, aws_key = None, aws_secret_key = None):
|
||||
# Connect to CloudFormation, if keys are None, they're loaded from
|
||||
# environment variables or boto configuration if present.
|
||||
conn = CloudFormationConnection(
|
||||
aws_access_key_id = aws_key,
|
||||
aws_secret_access_key = aws_secret_key
|
||||
)
|
||||
retval = True
|
||||
try:
|
||||
conn.validate_template(template_body = template)
|
||||
except BotoServerError as e:
|
||||
print >> sys.stderr, "Template Validation Failed:"
|
||||
for Error in ElementTree.fromstring(e.args[2]):
|
||||
if not Error.tag.endswith("Error"):
|
||||
continue
|
||||
code = "Unknown"
|
||||
msg = "Unknown"
|
||||
for element in Error:
|
||||
if element.tag.endswith("Code"):
|
||||
code = element.text
|
||||
elif element.tag.endswith("Message"):
|
||||
msg = element.text
|
||||
print >> sys.stderr, "Source: %s" % code
|
||||
print >> sys.stderr, "Message: %s" % msg
|
||||
retval = False
|
||||
conn.close()
|
||||
return retval
|
||||
|
||||
def main():
|
||||
parser = ArgumentParser(
|
||||
description = "Convert YAML to validated CloudFormation template JSON",
|
||||
epilog = "AWS credentials can also be provided by environment "
|
||||
"variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY or "
|
||||
"by boto configuration file.",
|
||||
formatter_class = ArgumentDefaultsHelpFormatter
|
||||
)
|
||||
parser.add_argument(
|
||||
"yaml_template",
|
||||
help = "CloudFormation template as YAML"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o", "--output-file",
|
||||
help = "File to write validated JSON template",
|
||||
default = "-"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k", "--aws-key",
|
||||
help = "AWS Key"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s", "--aws-secret-key",
|
||||
help = "AWS Secret Key"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-validation",
|
||||
action = "store_true",
|
||||
help = "Skip template validation"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load template
|
||||
template, loaded = load_template(args.yaml_template)
|
||||
if not loaded:
|
||||
return 1
|
||||
|
||||
# Validate template
|
||||
if not args.skip_validation:
|
||||
if not validate_template(template, args.aws_key, args.aws_secret_key):
|
||||
return 2
|
||||
|
||||
# Output validate template
|
||||
if args.output_file is "-":
|
||||
print template
|
||||
else:
|
||||
with open(args.output_file, 'w') as output:
|
||||
output.write(template)
|
||||
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
|
@ -0,0 +1,13 @@
|
|||
|
||||
SECRET_KEY = 'Overwrite with a secret on deployment'
|
||||
|
||||
# AWS EC2 configuration
|
||||
AWS_REGION = 'us-west-2'
|
||||
INSTANCE_TYPE = 'm1.medium'
|
||||
SECURITY_GROUPS = []
|
||||
INSTANCE_PROFILE = 'telemetry-analysis-profile'
|
||||
INSTANCE_APP_TAG = 'telemetry-analysis-debug-instance'
|
||||
EMAIL_SOURCE = 'jonasfj@mozilla.com'
|
||||
|
||||
# Bucket for storing public ssh-keys
|
||||
TEMPORARY_BUCKET = 'bucket-for-ssh-keys'
|
|
@ -0,0 +1,284 @@
|
|||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
|
||||
Description: >
|
||||
This deploys a service that allows users to spawn EC2 instances that they can
|
||||
use to analysis telemetry published data bucket.
|
||||
|
||||
################################################################################
|
||||
|
||||
Parameters:
|
||||
|
||||
serverInstanceType:
|
||||
Description: Server instance type
|
||||
Type: String
|
||||
Default: m1.small
|
||||
AllowedValues:
|
||||
- m1.small
|
||||
- m1.medium
|
||||
- m1.large
|
||||
- m1.xlarge
|
||||
- m2.xlarge
|
||||
- m2.2xlarge
|
||||
- m2.4xlarge
|
||||
- c1.medium
|
||||
- c1.xlarge
|
||||
ConstraintDescription: must be a valid EC2 instance type.
|
||||
|
||||
# Currently, we just setup to consume from load balancer configured with
|
||||
# SSL certificate and DNS else where...telemetry-dash-lb
|
||||
LoadBalancer:
|
||||
Description: Load balancer setup with the correct DNS
|
||||
Type: String
|
||||
Default: telemetry-dash-lb
|
||||
|
||||
#REMARK: We should consider removing keyName as it shouldn't be possible to
|
||||
# ssh in to the server instance
|
||||
keyName:
|
||||
Description: Name of key pair for SSH
|
||||
Type: String
|
||||
Default: jonasfj
|
||||
|
||||
serverSecret:
|
||||
Description: Secret key for signing cookies on the server
|
||||
Type: String
|
||||
|
||||
sourcesBucket:
|
||||
Description: Bucket containing sources and templates
|
||||
Type: String
|
||||
Default: jonasfj-telemetry-code
|
||||
|
||||
sourcesVersion:
|
||||
Description: Version of sources to load from sources bucket
|
||||
Type: String
|
||||
Default: '1'
|
||||
|
||||
################################################################################
|
||||
|
||||
Mappings:
|
||||
|
||||
# A map from region to 64 bit Ubuntu 13.04 backed by instance storage
|
||||
# We use instance storage to avoid stability issues with EBS also we don't
|
||||
# have to pay for IO. We'll initialize these with CloudInit, later.
|
||||
# See: http://cloud-images.ubuntu.com/locator/ec2/
|
||||
regionToAMI:
|
||||
ap-northeast-1: {AMI: ami-7f41da7e}
|
||||
ap-southeast-1: {AMI: ami-3af8b268}
|
||||
ap-southeast-2: {AMI: ami-a5960a9f}
|
||||
eu-west-1: {AMI: ami-2adc3c5d}
|
||||
sa-east-1: {AMI: ami-f1dd7bec}
|
||||
us-east-1: {AMI: ami-3d257954}
|
||||
us-west-1: {AMI: ami-20e5d265}
|
||||
us-west-2: {AMI: ami-2460f914}
|
||||
|
||||
################################################################################
|
||||
|
||||
Resources:
|
||||
|
||||
temporaryBucket:
|
||||
Type: AWS::S3::Bucket
|
||||
DeletionPolicy: Delete
|
||||
Properties:
|
||||
AccessControl: Private
|
||||
|
||||
# Security group for service server, http and ssh (well, disable ssh later)
|
||||
serverSecurityGroup:
|
||||
Type: AWS::EC2::SecurityGroup
|
||||
Properties:
|
||||
GroupDescription: telemetry-analysis debug service security group
|
||||
SecurityGroupIngress:
|
||||
- {IpProtocol: tcp, FromPort: 22, ToPort: 22, CidrIp: 0.0.0.0/0}
|
||||
- {IpProtocol: tcp, FromPort: 80, ToPort: 80, CidrIp: 0.0.0.0/0}
|
||||
|
||||
# IAM role for service server
|
||||
serverInstanceIAMRole:
|
||||
Type: AWS::IAM::Role
|
||||
DependsOn:
|
||||
- temporaryBucket
|
||||
Properties:
|
||||
AssumeRolePolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Principal:
|
||||
Service:
|
||||
- ec2.amazonaws.com
|
||||
Action:
|
||||
- 'sts:AssumeRole'
|
||||
Path: /telemetry/analysis/debug-service/
|
||||
Policies:
|
||||
- PolicyName: startDebugInstancesPolicy
|
||||
PolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- 'ec2:RunInstances'
|
||||
- 'ec2:DescribeInstances'
|
||||
- 'ec2:TerminateInstances'
|
||||
- '*' # this is definitely suboptimal, but it works
|
||||
Resource: '*' # I think this works, probably not best :)
|
||||
- PolicyName: putObjectTemporaryBucketPolicy
|
||||
PolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- 's3:PutObject'
|
||||
Resource:
|
||||
- {'Fn::Join': ["", ['arn:aws:s3:::', {Ref: temporaryBucket}, '/*']]}
|
||||
- PolicyName: getObjectSourcesBucketPolicy
|
||||
PolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- 's3:GetObject'
|
||||
Resource:
|
||||
- {'Fn::Join': ["", [
|
||||
'arn:aws:s3:::',
|
||||
{Ref: sourcesBucket},
|
||||
'/v', {Ref: sourcesVersion}, '/telemetry-analysis-debug-service.tar.gz'
|
||||
]]}
|
||||
|
||||
# IAM instance profile granting serverInstanceIAMRole to service server
|
||||
serverInstanceProfile:
|
||||
Type: AWS::IAM::InstanceProfile
|
||||
DependsOn:
|
||||
- serverInstanceIAMRole
|
||||
Properties:
|
||||
Path: /telemetry/analysis/debug-service/
|
||||
Roles:
|
||||
- {Ref: serverInstanceIAMRole}
|
||||
|
||||
serverLaunchConfig:
|
||||
Type: AWS::AutoScaling::LaunchConfiguration
|
||||
DependsOn:
|
||||
- serverInstanceProfile
|
||||
- serverSecurityGroup
|
||||
- workerInstanceProfile
|
||||
- debugInstanceSecurityGroup
|
||||
Metadata:
|
||||
Comment: telemetry-analysis-debug-service
|
||||
Properties:
|
||||
KeyName: {Ref: keyName}
|
||||
ImageId: {'Fn::FindInMap': [
|
||||
'regionToAMI',
|
||||
{Ref: 'AWS::Region'},
|
||||
'AMI'
|
||||
]}
|
||||
InstanceType: {Ref: serverInstanceType}
|
||||
IamInstanceProfile: {Ref: serverInstanceProfile}
|
||||
SecurityGroups:
|
||||
- {Ref: serverSecurityGroup}
|
||||
UserData: {'Fn::Base64': {'Fn::Join': ['', [
|
||||
"#!/bin/bash\n",
|
||||
"sudo apt-get -y install python-pip git python-dateutil python-dev nginx\n",
|
||||
"sudo pip install --upgrade boto flask flask-login uwsgi\n",
|
||||
"sudo pip install git+https://github.com/garbados/flask-browserid.git\n",
|
||||
"cd /home/ubuntu\n",
|
||||
"python - << END\n",
|
||||
"from boto.s3 import connect_to_region\n",
|
||||
"s3 = connect_to_region('", {Ref: 'AWS::Region'} ,"')\n",
|
||||
"b = s3.get_bucket('", {Ref: sourcesBucket}, "', validate = False)\n",
|
||||
"k = b.get_key('v", {Ref: sourcesVersion}, "/telemetry-analysis-debug-service.tar.gz')\n",
|
||||
"k.get_contents_to_filename('sources.tar.gz')\n",
|
||||
"END\n",
|
||||
"tar -xzf sources.tar.gz\n",
|
||||
"sudo echo '42 * * * * ubuntu /home/ubuntu/terminate-expired-instances.py' >> /etc/crontab\n",
|
||||
"echo '' >> config.py\n",
|
||||
"echo \"SECRET_KEY = '", {Ref: serverSecret} , "'\" >> config.py\n",
|
||||
"echo \"AWS_REGION = '", {Ref: 'AWS::Region'} , "'\" >> config.py\n",
|
||||
"echo \"SECURITY_GROUPS = ['", {Ref: debugInstanceSecurityGroup} , "']\" >> config.py\n",
|
||||
"echo \"INSTANCE_PROFILE = '", {Ref: workerInstanceProfile} , "'\" >> config.py\n",
|
||||
"echo \"TEMPORARY_BUCKET = '", {Ref: temporaryBucket} , "'\" >> config.py\n",
|
||||
"echo \"DEBUG = True\" >> config.py\n",
|
||||
"sudo echo 'server {' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo echo ' listen 80;' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo echo ' server_name debugservice;' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo echo ' location / { try_files $uri @debugservice; }' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo echo ' location @debugservice {' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo echo ' include uwsgi_params;' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo echo ' uwsgi_pass unix:/tmp/uwsgi.sock;' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo echo ' }' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo echo '}' >> /etc/nginx/sites-enabled/debug-service\n",
|
||||
"sudo service nginx restart\n",
|
||||
"sudo -u ubuntu uwsgi --plugins python --chmod-socket=666 -s /tmp/uwsgi.sock --logto /tmp/debug-service.log -w server:app\n"
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
serverAutoScaleGroup:
|
||||
Type: AWS::AutoScaling::AutoScalingGroup
|
||||
DependsOn:
|
||||
- serverLaunchConfig
|
||||
Properties:
|
||||
AvailabilityZones: {'Fn::GetAZs': ''}
|
||||
LaunchConfigurationName: {Ref: serverLaunchConfig}
|
||||
MinSize: 1
|
||||
MaxSize: 1
|
||||
LoadBalancerNames:
|
||||
- {Ref: LoadBalancer}
|
||||
|
||||
################################################################################
|
||||
|
||||
# Security group for debug instances, this should be s3 and ssh only
|
||||
debugInstanceSecurityGroup:
|
||||
Type: AWS::EC2::SecurityGroup
|
||||
Properties:
|
||||
GroupDescription: telemetry-analysis debug instance security group
|
||||
SecurityGroupIngress:
|
||||
- {IpProtocol: tcp, FromPort: 22, ToPort: 22, CidrIp: 0.0.0.0/0}
|
||||
|
||||
# IAM role for analysis debug instances
|
||||
debugInstanceIAMRole:
|
||||
Type: AWS::IAM::Role
|
||||
DependsOn:
|
||||
- temporaryBucket
|
||||
Properties:
|
||||
AssumeRolePolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Principal:
|
||||
Service:
|
||||
- ec2.amazonaws.com
|
||||
Action:
|
||||
- 'sts:AssumeRole'
|
||||
Path: /telemetry/analysis/debug-instance/
|
||||
Policies:
|
||||
# Grant list access to telemetry-published-v1
|
||||
- PolicyName: telemetryPublishedDebugAccess
|
||||
PolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- 's3:ListBucket'
|
||||
Resource:
|
||||
- 'arn:aws:s3:::telemetry-published-v1'
|
||||
# Grant read access to telemetry-published-v1
|
||||
- PolicyName: telemetryPublishedDebugAccess
|
||||
PolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- 's3:GetObject'
|
||||
Resource:
|
||||
- 'arn:aws:s3:::telemetry-published-v1/*'
|
||||
# Grant access to read from temporary bucket, we store SSH keys here
|
||||
- PolicyName: getObjectTemporaryBucketPolicy
|
||||
PolicyDocument:
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- 's3:GetObject'
|
||||
Resource: {'Fn::Join': ["", [
|
||||
'arn:aws:s3:::',
|
||||
{Ref: temporaryBucket},
|
||||
'/*'
|
||||
]]}
|
||||
|
||||
# IAM instance profile granting debugInstanceIAMRole to debug instances
|
||||
workerInstanceProfile:
|
||||
Type: AWS::IAM::InstanceProfile
|
||||
DependsOn:
|
||||
- debugInstanceIAMRole
|
||||
Properties:
|
||||
Path: /telemetry/analysis/debug-instance/
|
||||
Roles:
|
||||
- {Ref: debugInstanceIAMRole}
|
|
@ -0,0 +1,24 @@
|
|||
CFYAML = ../cf-yaml-helper.py
|
||||
|
||||
SOURCES_BUCKET = jonasfj-telemetry-code
|
||||
VERSION = 1
|
||||
|
||||
FILES = $(shell find * -name "*.py") \
|
||||
$(shell find * -name "*.sh") \
|
||||
$(shell find * -name "*.html")
|
||||
|
||||
telemetry-analysis-debug-service.tar.gz: $(FILES)
|
||||
echo $(FILES)
|
||||
tar -czf $@ $^
|
||||
|
||||
debug-service-stack.json: debug-service-stack.yaml
|
||||
$(CFYAML) $< > $@
|
||||
|
||||
put: telemetry-analysis-debug-service.tar.gz debug-service-stack.json
|
||||
aws s3 cp telemetry-analysis-debug-service.tar.gz s3://$(SOURCES_BUCKET)/v$(VERSION)/telemetry-analysis-debug-service.tar.gz
|
||||
aws s3 cp debug-service-stack.json s3://$(SOURCES_BUCKET)/v$(VERSION)/debug-service-stack.json
|
||||
|
||||
clean:
|
||||
rm -f telemetry-analysis-debug-service.tar.gz debug-service-stack.json
|
||||
|
||||
.PHONY: put
|
|
@ -0,0 +1,170 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from flask import Flask, render_template, request, redirect, url_for
|
||||
from flask.ext.login import LoginManager, login_required, current_user
|
||||
from flask.ext.browserid import BrowserID
|
||||
from user import User, AnonymousUser
|
||||
from boto.ec2 import connect_to_region as ec2_connect
|
||||
from boto.ses import connect_to_region as ses_connect
|
||||
from boto.s3 import connect_to_region as s3_connect
|
||||
from urlparse import urljoin
|
||||
from uuid import uuid4
|
||||
|
||||
# Create flask app
|
||||
app = Flask(__name__)
|
||||
app.config.from_object('config')
|
||||
|
||||
# Connect to AWS
|
||||
ec2 = ec2_connect(app.config['AWS_REGION'])
|
||||
ses = ses_connect('us-east-1') # only supported region!
|
||||
s3 = s3_connect(app.config['AWS_REGION'])
|
||||
bucket = s3.get_bucket(app.config['TEMPORARY_BUCKET'], validate = False)
|
||||
|
||||
# Create login manager
|
||||
login_manager = LoginManager()
|
||||
login_manager.anonymous_user = AnonymousUser
|
||||
|
||||
# Initialize browser id login
|
||||
browser_id = BrowserID()
|
||||
|
||||
def abs_url_for(rule, **options):
|
||||
return urljoin(request.url_root, url_for(rule, **options))
|
||||
|
||||
@browser_id.user_loader
|
||||
def get_user(response):
|
||||
"""Create User from BrowserID response"""
|
||||
if response['status'] == 'okay':
|
||||
return User(response['email'])
|
||||
return User(None)
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(email):
|
||||
"""Create user from already authenticated email"""
|
||||
return User(email)
|
||||
|
||||
@login_manager.unauthorized_handler
|
||||
def unauthorized():
|
||||
return render_template('unauthorized.html')
|
||||
|
||||
# Routes
|
||||
@app.route('/', methods=["GET"])
|
||||
def index():
|
||||
return render_template('index.html', token = str(uuid4()))
|
||||
|
||||
@app.route("/spawn-debug-instance", methods=["POST"])
|
||||
@login_required
|
||||
def spawn_debug_instance():
|
||||
# Check that the user logged in is also authorized to do this
|
||||
if not current_user.is_authorized():
|
||||
return login_manager.unauthorized()
|
||||
|
||||
# Upload s3 key to bucket
|
||||
sshkey = bucket.new_key("keys/%s.pub" % request.form['token'])
|
||||
sshkey.set_contents_from_file(request.files['public-ssh-key'])
|
||||
|
||||
# Create
|
||||
boot_script = render_template('boot-script.sh',
|
||||
aws_region = app.config['AWS_REGION'],
|
||||
temporary_bucket = app.config['TEMPORARY_BUCKET'],
|
||||
ssh_key = sshkey.key
|
||||
)
|
||||
|
||||
# Create EC2 instance
|
||||
reservation = ec2.run_instances(
|
||||
image_id = 'ami-ace67f9c',
|
||||
security_groups = app.config['SECURITY_GROUPS'],
|
||||
user_data = boot_script,
|
||||
instance_type = app.config['INSTANCE_TYPE'],
|
||||
instance_initiated_shutdown_behavior = 'terminate',
|
||||
client_token = request.form['token'],
|
||||
instance_profile_name = app.config['INSTANCE_PROFILE']
|
||||
)
|
||||
instance = reservation.instances[0]
|
||||
|
||||
# Associate a few tags
|
||||
ec2.create_tags([instance.id], {
|
||||
"Owner": current_user.email,
|
||||
"Name": request.form['name'],
|
||||
"Application": app.config['INSTANCE_APP_TAG']
|
||||
})
|
||||
|
||||
# Send an email to the user who launched it
|
||||
params = {
|
||||
'monitoring_url': abs_url_for('monitor', instance_id = instance.id)
|
||||
}
|
||||
ses.send_email(
|
||||
source = app.config['EMAIL_SOURCE'],
|
||||
subject = ("telemetry-analysis debug instance: %s (%s) launched"
|
||||
% (request.form['name'], instance.id)),
|
||||
format = 'html',
|
||||
body = render_template('instance-launched-email.html', **params),
|
||||
to_addresses = [current_user.email]
|
||||
)
|
||||
return redirect(url_for('monitor', instance_id = instance.id))
|
||||
|
||||
@app.route("/monitor/<instance_id>", methods=["GET"])
|
||||
@login_required
|
||||
def monitor(instance_id):
|
||||
# Check that the user logged in is also authorized to do this
|
||||
if not current_user.is_authorized():
|
||||
return login_manager.unauthorized()
|
||||
|
||||
try:
|
||||
# Fetch the actual instance
|
||||
reservations = ec2.get_all_reservations(instance_ids = [instance_id])
|
||||
instance = reservations[0].instances[0]
|
||||
except IndexError:
|
||||
return "No such instance"
|
||||
|
||||
# Check that it is the owner who is logged in
|
||||
if instance.tags['Owner'] != current_user.email:
|
||||
return login_manager.unauthorized()
|
||||
|
||||
# Alright then, let's report status
|
||||
return render_template(
|
||||
'monitor.html',
|
||||
instance_state = instance.state,
|
||||
public_dns = instance.public_dns_name,
|
||||
terminate_url = abs_url_for('kill', instance_id = instance.id)
|
||||
)
|
||||
|
||||
@app.route("/kill/<instance_id>", methods=["GET"])
|
||||
@login_required
|
||||
def kill(instance_id):
|
||||
# Check that the user logged in is also authorized to do this
|
||||
if not current_user.is_authorized():
|
||||
return login_manager.unauthorized()
|
||||
|
||||
try:
|
||||
# Fetch the actual instance
|
||||
reservations = ec2.get_all_reservations(instance_ids = [instance_id])
|
||||
instance = reservations[0].instances[0]
|
||||
except IndexError:
|
||||
return "No such instance"
|
||||
|
||||
# Check that it is the owner who is logged in
|
||||
if instance.tags['Owner'] != current_user.email:
|
||||
return login_manager.unauthorized()
|
||||
|
||||
# Terminate and update instance
|
||||
instance.terminate()
|
||||
instance.update()
|
||||
|
||||
# Alright then, let's report status
|
||||
return render_template(
|
||||
'kill.html',
|
||||
instance_state = instance.state,
|
||||
public_dns = instance.public_dns_name,
|
||||
monitoring_url = abs_url_for('monitor', instance_id = instance.id)
|
||||
)
|
||||
|
||||
@app.route("/status", methods=["GET"])
|
||||
def status():
|
||||
return "OK"
|
||||
|
||||
login_manager.init_app(app)
|
||||
browser_id.init_app(app)
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host = '0.0.0.0', port = 80)
|
|
@ -0,0 +1,20 @@
|
|||
#!/bin/bash
|
||||
|
||||
cd /home/ubuntu
|
||||
|
||||
# Install a few dependencies
|
||||
sudo apt-get -y install xz-utils python-pip
|
||||
sudo pip install --upgrade boto awscli
|
||||
|
||||
# Get users ssh key
|
||||
python - << END
|
||||
from boto.s3 import connect_to_region
|
||||
s3 = connect_to_region('{{ aws_region }}')
|
||||
b = s3.get_bucket('{{ temporary_bucket }}', validate = False)
|
||||
k = b.get_key('{{ ssh_key }}')
|
||||
k.get_contents_to_filename('/home/ubuntu/user_key.pub')
|
||||
END
|
||||
|
||||
# Setup users ssh_key
|
||||
cat /home/ubuntu/user_key.pub >> /home/ubuntu/.ssh/authorized_keys
|
||||
chmod 600 /home/ubuntu/.ssh/authorized_keys
|
|
@ -0,0 +1,34 @@
|
|||
<html>
|
||||
<head>
|
||||
<script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.2/jquery.min.js"></script>
|
||||
<script src="https://login.persona.org/include.js" type="text/javascript"></script>
|
||||
<script type="text/javascript">{{ auth_script|safe }}</script>
|
||||
</head>
|
||||
<body>
|
||||
{% if current_user.is_authenticated() %}
|
||||
<button id="browserid-logout">Logout</button>
|
||||
{% else %}
|
||||
<button id="browserid-login">Login</button>
|
||||
{% endif %}
|
||||
|
||||
{% if current_user.is_authenticated() %}
|
||||
{% if current_user.is_authorized() %}
|
||||
Yes mozilla email
|
||||
<br>
|
||||
<form
|
||||
action="spawn-debug-instance"
|
||||
method="post"
|
||||
enctype="multipart/form-data">
|
||||
<input type="hidden" name="token" value="{{ token }}">
|
||||
Server Name: <input type="text" name="name"><br>
|
||||
SSH-key: <input type="file" name="public-ssh-key"><br>
|
||||
<input type="submit" name="submit" value="Submit">
|
||||
</form>
|
||||
{% else %}
|
||||
Sorry only mozilla emails...
|
||||
{% endif %}
|
||||
{% else %}
|
||||
Please login...
|
||||
{% endif %}
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,11 @@
|
|||
Hi,<br>
|
||||
We've launched an EC2 instance with access to telemetry published data, at your
|
||||
request. As the instance powers up you can:<br>
|
||||
<ul>
|
||||
<li>Monitor instance status,</li>
|
||||
<li>Find public DNS, and</li>
|
||||
<li>Terminate the instance,</li>
|
||||
</ul>
|
||||
here: {{ monitoring_url }}<br>
|
||||
<br>
|
||||
Please, be sure to kill your instance when you're done with it.
|
|
@ -0,0 +1,5 @@
|
|||
<h1>We've killed your instance</h1>
|
||||
|
||||
state: {{ instance_state }}<br>
|
||||
<br>
|
||||
You can still monitor it here {{ monitoring_url }} just to make sure it terminates.
|
|
@ -0,0 +1,5 @@
|
|||
<h1>Instance Monitoring</h1>
|
||||
|
||||
state: {{ instance_state }}<br>
|
||||
public dns: {{ public_dns }}<br>
|
||||
terminate it here: {{ terminate_url }}
|
|
@ -0,0 +1,29 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from boto.ec import connect_to_region as ec2_connect
|
||||
from dateutil.parser import parse as parse_date
|
||||
from datetime.datetime import utcnow
|
||||
import config
|
||||
|
||||
ec2 = ec2_connect(region = config.AWS_REGION)
|
||||
ses = ses_connect(region = config.AWS_REGION)
|
||||
|
||||
def main():
|
||||
reservations = ec2.get_all_reservations(
|
||||
filters = {'tag:Application': config.INSTANCE_APP_TAG}
|
||||
)
|
||||
for reservation in reservations:
|
||||
for instance in reservation:
|
||||
time = utcnow() - parse_date(instance.launch_time, ignoretz = True)
|
||||
if time.days >= 1:
|
||||
name = instance.tags.get('name', instance.id)
|
||||
ses.send(
|
||||
source = config.EMAIL_SOURCE,
|
||||
subject = "telemetry-analysis debug instance %s terminated!" % name,
|
||||
body = "We've terminated your instance as it has been running for over 24 hours!",
|
||||
to_addresses = [instance.tags['Owner']]
|
||||
)
|
||||
instance.terminate()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,24 @@
|
|||
from flask.ext.login import UserMixin, AnonymousUserMixin
|
||||
|
||||
class User(UserMixin):
|
||||
def __init__(self, email):
|
||||
self.email = email
|
||||
|
||||
def is_authenticated(self):
|
||||
return self.email != None
|
||||
|
||||
def is_authorized(self):
|
||||
return self.email.endswith('mozilla.com') or self.email.endswith('mozilla.org')
|
||||
|
||||
def is_active(self):
|
||||
return self.email != None
|
||||
|
||||
def is_anonymous(self):
|
||||
return self.email == None
|
||||
|
||||
def get_id(self):
|
||||
return self.email
|
||||
|
||||
class AnonymousUser(AnonymousUserMixin):
|
||||
def is_authorized(self):
|
||||
return False
|
|
@ -0,0 +1,52 @@
|
|||
from multiprocessing import Process
|
||||
from boto.s3.connection import S3Connection
|
||||
from boto.s3.key import Key
|
||||
import os, sys
|
||||
from traceback import print_exc
|
||||
|
||||
class DownloaderProcess(Process):
|
||||
""" Worker process that download files from queue to folder """
|
||||
def __init__(self, input_queue, output_queue,
|
||||
work_folder,
|
||||
aws_key, aws_secret_key):
|
||||
super(DownloaderProcess, self).__init__()
|
||||
self._input_queue = input_queue
|
||||
self._output_queue = output_queue
|
||||
self._work_folder = work_folder
|
||||
self._input_bucket = "telemetry-published-v1"
|
||||
self._aws_key = aws_key
|
||||
self._aws_secret_key = aws_secret_key
|
||||
self._conn = S3Connection(self._aws_key, self._aws_secret_key)
|
||||
self._bucket = self._conn.get_bucket(self._input_bucket)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
filepath = self._input_queue.get()
|
||||
self.download(filepath)
|
||||
|
||||
def download(self, filepath):
|
||||
# Get filename from path
|
||||
filename = os.path.basename(filepath)
|
||||
# Get target filepath
|
||||
target = os.path.join(self._work_folder, filename)
|
||||
# Download file
|
||||
retries = 1
|
||||
success = False
|
||||
while retries < 3:
|
||||
try:
|
||||
k = Key(self._bucket)
|
||||
k.key = filepath
|
||||
k.get_contents_to_filename(target)
|
||||
success = True
|
||||
break
|
||||
except:
|
||||
retries += 1
|
||||
print >> sys.stderr, "Error on %i'th try:" % retries
|
||||
print_exc(file = sys.stderr)
|
||||
|
||||
if success:
|
||||
# Put file to output query
|
||||
self._output_queue.put((target, filepath))
|
||||
else:
|
||||
print >> sys.stderr, "Failed to download: %s" % filepath
|
||||
self._output_queue.put((None, filepath))
|
|
@ -0,0 +1,20 @@
|
|||
import os
|
||||
|
||||
class RowCountingProcessor:
|
||||
def __init__(self):
|
||||
self.output_folder = None
|
||||
self.values_counted = 0
|
||||
|
||||
def set_output_folder(self, folder):
|
||||
self.output_folder = folder
|
||||
|
||||
def scan(self, key, value):
|
||||
self.values_counted += 1
|
||||
|
||||
def write_output(self):
|
||||
open(os.path.join(self.output_folder, "counted-rows"), "w").write(str(self.values_counted))
|
||||
|
||||
def clear_state(self):
|
||||
self.values_counted = 0
|
||||
|
||||
Processor = RowCountingProcessor
|
|
@ -0,0 +1,150 @@
|
|||
import argparse
|
||||
from boto.s3.connection import S3Connection
|
||||
from boto.s3.key import Key
|
||||
from boto import sqs
|
||||
from boto.sqs.jsonmessage import JSONMessage
|
||||
from uuid import uuid4
|
||||
import json
|
||||
from telemetry_schema import TelemetrySchema
|
||||
import sys
|
||||
|
||||
TASK_TIMEOUT = 60 * 60
|
||||
|
||||
class AnalysisJob:
|
||||
def __init__(self, cfg):
|
||||
self.job_bundle = cfg.job_bundle
|
||||
self.input_filter = TelemetrySchema(json.load(open(cfg.input_filter)))
|
||||
self.job_id = str(uuid4())
|
||||
self.target_queue = cfg.target_queue
|
||||
self.aws_key = cfg.aws_key
|
||||
self.aws_secret_key = cfg.aws_secret_key
|
||||
self.input_bucket = "telemetry-published-v1"
|
||||
|
||||
# Bucket with intermediate data for this analysis job
|
||||
self.analysis_bucket = "jonasfj-telemetry-analysis"
|
||||
|
||||
self.s3_code_path = "batch-jobs/" + self.job_id + ".zip"
|
||||
|
||||
# S3 region of operation
|
||||
self.aws_region = "us-west-2"
|
||||
self.task_size_limit = 400 * 1024 * 1024
|
||||
self.sqs_input_name = "telemetry-analysis-stack-telemetryAnalysisInput-1FD6PP5J6FY83" #"telemetry-analysis-input"
|
||||
|
||||
def get_filtered_files(self):
|
||||
""" Get tuples of name and size for all input files """
|
||||
# Setup some auxiliary functions
|
||||
allowed_values = self.input_filter.sanitize_allowed_values()
|
||||
nb_dims = len(allowed_values)
|
||||
def filter_includes(level, value):
|
||||
return self.input_filter.is_allowed(value, allowed_values[level])
|
||||
|
||||
# iterate over all files in bucket, this is very slow and we should be
|
||||
# be able to something much smarter using prefix listing and ordering
|
||||
# to break listing.
|
||||
count = 0
|
||||
selected = 0
|
||||
conn = S3Connection(self.aws_key, self.aws_secret_key)
|
||||
bucket = conn.get_bucket(self.input_bucket)
|
||||
for f in bucket.list():
|
||||
count += 1
|
||||
dims = self.input_filter.get_dimensions(".", f.key)
|
||||
include = True
|
||||
for i in xrange(nb_dims):
|
||||
if not filter_includes(i, dims[i]):
|
||||
include = False
|
||||
break
|
||||
if include:
|
||||
selected += 1
|
||||
yield (f.key, f.size)
|
||||
if count % 5000 == 0:
|
||||
print "%i files listed with %i selected" % (count, selected)
|
||||
conn.close()
|
||||
|
||||
def generate_tasks(self):
|
||||
""" Generates SQS tasks, we batch small files into a single task """
|
||||
uid = str(uuid4())
|
||||
taskid = 0
|
||||
taskfiles = []
|
||||
tasksize = 0
|
||||
for key, size in self.get_filtered_files():
|
||||
# If the task have reached desired size we yield it
|
||||
# Note, as SQS messages are limited to 65 KiB we limit tasks to
|
||||
# 100 filenames, for simplicity
|
||||
# boto only uses signature version 4, hence, we're limited to 65 KiB
|
||||
if 0 < len(taskfiles) and (tasksize + size > self.task_size_limit or
|
||||
len(taskfiles) > 200):
|
||||
# Reduce to only filenames, sort by size... smallest first they are
|
||||
# faster to download when handling the job
|
||||
taskfiles = [f for f,s in sorted(taskfiles, key=lambda (f,s): s)]
|
||||
yield {
|
||||
'id': uid + str(taskid),
|
||||
'code': self.s3_code_path,
|
||||
'target-queue': self.target_queue,
|
||||
'files': taskfiles,
|
||||
'size': tasksize
|
||||
}
|
||||
taskid += 1
|
||||
taskfiles = []
|
||||
tasksize = 0
|
||||
tasksize += size
|
||||
taskfiles.append((key, size))
|
||||
if len(taskfiles) > 0:
|
||||
taskfiles = [f for f,s in sorted(taskfiles, key=lambda (f,s): s)]
|
||||
yield {
|
||||
'id': uid + str(taskid),
|
||||
'code': self.s3_code_path,
|
||||
'target-queue': self.target_queue,
|
||||
'files': taskfiles,
|
||||
'size': tasksize
|
||||
}
|
||||
print "%i tasks created" % taskid
|
||||
|
||||
def put_sqs_tasks(self):
|
||||
""" Create an SQS tasks for this analysis job """
|
||||
print "Populate SQS input queue with tasks"
|
||||
# Connect to SQS is desired region
|
||||
conn = sqs.connect_to_region(
|
||||
self.aws_region,
|
||||
aws_access_key_id = self.aws_key,
|
||||
aws_secret_access_key = self.aws_secret_key
|
||||
)
|
||||
# Create queue
|
||||
queue = conn.get_queue(self.sqs_input_name)
|
||||
queue.set_message_class(JSONMessage)
|
||||
# Populate queue with tasks
|
||||
for task in self.generate_tasks():
|
||||
msg = queue.new_message(body = task)
|
||||
queue.write(msg)
|
||||
conn.close()
|
||||
|
||||
def setup(self):
|
||||
self.upload_job_bundle()
|
||||
self.put_sqs_tasks()
|
||||
print "Uploaded with job_id: %s" % self.job_id
|
||||
|
||||
def run(self):
|
||||
self.create_spot_request()
|
||||
|
||||
def upload_job_bundle(self):
|
||||
""" Upload job bundle to S3 """
|
||||
conn = S3Connection(self.aws_key, self.aws_secret_key)
|
||||
bucket = conn.get_bucket(self.analysis_bucket)
|
||||
k = Key(bucket)
|
||||
k.key = self.s3_code_path
|
||||
k.set_contents_from_filename(self.job_bundle)
|
||||
conn.close()
|
||||
|
||||
|
||||
def main():
|
||||
p = argparse.ArgumentParser(description='Run analysis job', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
p.add_argument("job_bundle", help="The analysis bundle to run")
|
||||
p.add_argument("-k", "--aws-key", help="AWS Key")
|
||||
p.add_argument("-s", "--aws-secret-key", help="AWS Secret Key")
|
||||
p.add_argument("-f", "--input-filter", help="File containing filter spec", required=True)
|
||||
p.add_argument("-q", "--target-queue", help="SQS queue for communicating finished tasks", required=True)
|
||||
job = AnalysisJob(p.parse_args())
|
||||
job.setup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
|
@ -0,0 +1,12 @@
|
|||
FILES = downloader.py manager.py telemetry_schema.py worker.py telemetry-analysis-worker.sh
|
||||
|
||||
telemetry-analysis-worker.tar.gz: $(FILES)
|
||||
tar -czf telemetry-analysis-worker.tar.gz $^
|
||||
|
||||
put-worker-bundle: telemetry-analysis-worker.tar.gz
|
||||
s3cmd put -f telemetry-analysis-worker.tar.gz s3://jonasfj-telemetry-analysis/bundles/
|
||||
|
||||
clean:
|
||||
rm telemetry-analysis-worker-bundle.tar.gz
|
||||
|
||||
.PHONY: put-worker-bundle
|
|
@ -0,0 +1,38 @@
|
|||
import argparse
|
||||
from multiprocessing import Queue, cpu_count
|
||||
from threading import Thread
|
||||
from worker import AnalysisWorker
|
||||
import os, sys
|
||||
|
||||
class Manager(Thread):
|
||||
def __init__(self, aws_key, aws_secret, work_dir, index):
|
||||
super(Manager, self).__init__()
|
||||
self.aws_key = aws_key
|
||||
self.aws_secret = aws_secret
|
||||
self.work_dir = os.path.join(work_dir, "worker-%i" % index)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
# A manager is essential a guy who creates/hires a worker
|
||||
worker = AnalysisWorker(self.aws_key, self.aws_secret, self.work_dir)
|
||||
# Puts the worker to work
|
||||
worker.start()
|
||||
# Sit's back and wait for the worker to die
|
||||
worker.join() #TODO, timeout and kill worker process tree, also only retry failed sqs messages 3 times
|
||||
# Then goes on to create the next worker :)
|
||||
continue
|
||||
|
||||
def main():
|
||||
p = argparse.ArgumentParser(description='Run analysis worker', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
p.add_argument("-q", "--queue", help="SQS input queue")
|
||||
p.add_argument("-k", "--aws-key", help="AWS Key")
|
||||
p.add_argument("-s", "--aws-secret-key", help="AWS Secret Key")
|
||||
p.add_argument("-w", "--work-dir", help="Location to put temporary work files")
|
||||
cfg = p.parse_args()
|
||||
|
||||
for index in xrange(0, cpu_count()):
|
||||
manager = Manager(cfg.aws_key, cfg.aws_secret_key, cfg.work_dir, index)
|
||||
manager.start()
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
|
@ -0,0 +1,30 @@
|
|||
{
|
||||
"version": 1,
|
||||
"dimensions": [
|
||||
{
|
||||
"field_name": "reason",
|
||||
"allowed_values": ["saved_session"]
|
||||
},
|
||||
{
|
||||
"field_name": "appName",
|
||||
"allowed_values": "*"
|
||||
},
|
||||
{
|
||||
"field_name": "appUpdateChannel",
|
||||
"allowed_values": ["release", "aurora", "nightly", "beta", "nightly-ux"]
|
||||
},
|
||||
{
|
||||
"field_name": "appVersion",
|
||||
"allowed_values": "*"
|
||||
},
|
||||
{
|
||||
"field_name": "appBuildID",
|
||||
"allowed_values": "*"
|
||||
},
|
||||
{
|
||||
"field_name": "submission_date",
|
||||
"allowed_values": ["20131013"]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -0,0 +1,192 @@
|
|||
import argparse
|
||||
from multiprocessing import Queue, Process
|
||||
from traceback import print_exc
|
||||
from downloader import DownloaderProcess
|
||||
import os, sys, shutil
|
||||
from boto import sqs
|
||||
from boto.sqs.jsonmessage import JSONMessage
|
||||
from boto.s3.connection import S3Connection
|
||||
from boto.s3.key import Key
|
||||
from subprocess import Popen, PIPE
|
||||
from zipimport import zipimporter
|
||||
import errno
|
||||
|
||||
def mkdirp(path):
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST or not os.path.isdir(path):
|
||||
raise
|
||||
|
||||
NUMBER_OF_DOWNLOADERS = 4
|
||||
|
||||
class AnalysisWorker(Process):
|
||||
""" Analysis worker that finishes tasks from SQS """
|
||||
def __init__(self, aws_key, aws_secret_key, work_dir):
|
||||
super(AnalysisWorker, self).__init__()
|
||||
self.aws_key = aws_key
|
||||
self.aws_secret_key = aws_secret_key
|
||||
self.work_folder = work_dir
|
||||
self.download_queue = Queue()
|
||||
self.processing_queue = Queue()
|
||||
self.input_folder = os.path.join(self.work_folder, "input")
|
||||
self.output_folder = os.path.join(self.work_folder, "output")
|
||||
|
||||
# Bucket with intermediate data for this analysis job
|
||||
self.analysis_bucket_name = "jonasfj-telemetry-analysis"
|
||||
|
||||
# S3 region of operation
|
||||
self.aws_region = "us-west-2"
|
||||
self.sqs_input_name = "telemetry-analysis-input"
|
||||
|
||||
def setup(self):
|
||||
print "Worker setting up"
|
||||
# Remove work folder, no failures allowed
|
||||
if os.path.exists(self.work_folder):
|
||||
shutil.rmtree(self.work_folder, ignore_errors = False)
|
||||
|
||||
# Create folders as needed
|
||||
mkdirp(self.input_folder)
|
||||
mkdirp(self.output_folder)
|
||||
|
||||
# Launch two downloader processes
|
||||
self.downloaders = []
|
||||
for i in xrange(0, NUMBER_OF_DOWNLOADERS):
|
||||
d = DownloaderProcess(self.download_queue, self.processing_queue,
|
||||
self.input_folder,
|
||||
self.aws_key, self.aws_secret_key)
|
||||
self.downloaders.append(d)
|
||||
d.start()
|
||||
|
||||
# Connect to SQS
|
||||
self.sqs_conn = sqs.connect_to_region(
|
||||
self.aws_region,
|
||||
aws_access_key_id = self.aws_key,
|
||||
aws_secret_access_key = self.aws_secret_key
|
||||
)
|
||||
self.sqs_input_queue = self.sqs_conn.get_queue(self.sqs_input_name)
|
||||
self.sqs_input_queue.set_message_class(JSONMessage)
|
||||
|
||||
# Connect to S3
|
||||
self.s3_conn = S3Connection(self.aws_key, self.aws_secret_key)
|
||||
self.analysis_bucket = self.s3_conn.get_bucket(self.analysis_bucket_name)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
self.setup()
|
||||
msgs = self.sqs_input_queue.get_messages(num_messages = 1)
|
||||
if len(msgs) > 0:
|
||||
self.process_message(msgs[0])
|
||||
except:
|
||||
print >> sys.stderr, "Failed job, cleaning up after this:"
|
||||
print_exc(file = sys.stderr)
|
||||
finally:
|
||||
self.teardown()
|
||||
|
||||
def teardown(self):
|
||||
# Murder downloaders
|
||||
for d in self.downloaders:
|
||||
d.terminate()
|
||||
|
||||
# Remove work folder, as best possible
|
||||
shutil.rmtree(self.work_folder, ignore_errors = True)
|
||||
|
||||
# Close service connections
|
||||
self.sqs_conn.close()
|
||||
self.s3_conn.close()
|
||||
|
||||
def process_message(self, msg):
|
||||
# Start downloading all files
|
||||
for f in msg["files"]:
|
||||
self.download_queue.put(f)
|
||||
|
||||
print "Processing Message"
|
||||
print "id: %s" % msg["id"]
|
||||
print "code: %s" % msg["code"]
|
||||
print "files: %i" % len(msg["files"])
|
||||
print "size: %s" % msg["size"]
|
||||
print "target-queue: %s" % msg["target-queue"]
|
||||
|
||||
# Download the job bundle
|
||||
job_bundle_target = os.path.join(self.work_folder, "job.zip")
|
||||
k = Key(self.analysis_bucket)
|
||||
k.key = msg["code"]
|
||||
k.get_contents_to_filename(job_bundle_target)
|
||||
|
||||
# zipimport job bundle
|
||||
proc_module = zipimporter(job_bundle_target).load_module("processor")
|
||||
processor = proc_module.Processor()
|
||||
processor.set_output_folder(self.output_folder)
|
||||
|
||||
# maintain set of files
|
||||
fileset = set(msg["files"])
|
||||
|
||||
# Wait for downloads to finish until, all files are processed
|
||||
while len(fileset) != 0:
|
||||
(path, fileentry) = self.processing_queue.get()
|
||||
print "Processing %s" % fileentry
|
||||
if fileentry in fileset and path != None:
|
||||
self.process_file(processor, path)
|
||||
fileset.remove(fileentry)
|
||||
|
||||
# Ask processor to write output
|
||||
processor.write_output()
|
||||
processor.clear_state()
|
||||
|
||||
# Upload result to S3
|
||||
target_prefix = "output/" + msg["id"] + "/"
|
||||
|
||||
#TODO multi-process uploaders like downloaders
|
||||
k = Key(self.analysis_bucket)
|
||||
for path, folder, files in os.walk(self.output_folder):
|
||||
for f in files:
|
||||
k.key = target_prefix + os.path.relpath(os.path.join(path, f), self.output_folder)
|
||||
k.set_contents_from_filename(os.path.join(path, f))
|
||||
|
||||
# Delete SQS message
|
||||
self.sqs_input_queue.delete_message(msg)
|
||||
|
||||
# Get output SQS queue
|
||||
target_queue = self.sqs_conn.get_queue(msg["target-queue"])
|
||||
target_queue.set_message_class(JSONMessage)
|
||||
|
||||
m = target_queue.new_message(body = {'id': msg["id"]})
|
||||
target_queue.write(m)
|
||||
|
||||
print "Finished task: %s" % msg["id"]
|
||||
|
||||
def process_file(self, processor, path):
|
||||
self.open_compressor(path)
|
||||
line_nb = 0
|
||||
for line in self.decompressor.stdout:
|
||||
line_nb += 1
|
||||
try:
|
||||
key, value = line.split("\t", 1)
|
||||
processor.scan(key, value)
|
||||
except:
|
||||
print >> sys.stderr, ("Bad input line: %i of %s" %
|
||||
(line_nb, self.filename))
|
||||
print_exc(file = sys.stderr)
|
||||
self.close_compressor()
|
||||
|
||||
|
||||
def open_compressor(self, path):
|
||||
self.raw_handle = open(path, "rb")
|
||||
self.decompressor = Popen(
|
||||
['xz', '-d', '-c'],
|
||||
bufsize = 65536,
|
||||
stdin = self.raw_handle,
|
||||
stdout = PIPE,
|
||||
stderr = sys.stderr
|
||||
)
|
||||
|
||||
def close_compressor(self):
|
||||
self.decompressor.stdout.close()
|
||||
self.raw_handle.close()
|
||||
#if self.decompressor.poll() != 0:
|
||||
# print >> sys.stderr, "decompressor exited: %s" % self.decompressor.returncode
|
||||
# self.decompressor.kill()
|
||||
self.decompressor = None
|
||||
self.raw_handle = None
|
||||
|
||||
|
Загрузка…
Ссылка в новой задаче