metrics-aws-clusters/server.py

458 строки
20 KiB
Python

#!/usr/bin/env python
from argparse import ArgumentParser
from flask import Flask, render_template, g, request, redirect, url_for, jsonify,session
from flask.ext.login import LoginManager, login_required, current_user
from flask.ext.browserid import BrowserID
from user import User, AnonymousUser
from urlparse import urljoin
from sqlalchemy import create_engine, MetaData
from sqlalchemy.sql import select, func
from tempfile import mkstemp
import json
import re
import os.path
from boto.emr.connection import EmrConnection
from boto.exception import EmrResponseError
from boto.emr.instance_group import InstanceGroup
from boto.emr import connect_to_region as emr_connect, BootstrapAction,JarStep
import boto
import time
import misc
import functools
# Create flask app
app = Flask(__name__)
app.debug=True
app.config.from_object('config')
EmrConnection.DefaultRegionName="us-east-1"
import datetime
start_time = datetime.datetime.now()
with open(app.config['FILE_FOR_SECKEY']) as f:
l = f.readlines()
app.config['SECRET_KEY'] = l[0].rstrip()
app.config['ACCESS_ID'] = l[1].rstrip()
botoconn = EmrConnection(aws_access_key_id=app.config['ACCESS_ID'], aws_secret_access_key=app.config['SECRET_KEY'])
ec2boto = boto.connect_ec2(app.config['ACCESS_ID'], app.config['SECRET_KEY'])
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
# Create login manager
login_manager = LoginManager()
login_manager.anonymous_user = AnonymousUser
# Initialize browser id login
browser_id = BrowserID()
import functools
pctfunction = functools.partial(misc.percentile,percent=app.config['SPOT_PRICE_PCT_RULE'])
class back(object):
## http://flask.pocoo.org/snippets/120/
cfg = app.config.get
cookie = cfg('REDIRECT_BACK_COOKIE', 'back')
default_view = cfg('REDIRECT_BACK_DEFAULT', 'index')
@staticmethod
def anchor(func, cookie=cookie):
@functools.wraps(func)
def result(*args, **kwargs):
session[cookie] = request.url
return func(*args, **kwargs)
return result
@staticmethod
def url(default=default_view, cookie=cookie):
return session.get(cookie, url_for(default))
@staticmethod
def redirect(default=default_view, cookie=cookie):
return redirect(back.url(default, cookie))
back = back()
@browser_id.user_loader
def get_user(response):
"""Create User from BrowserID response"""
if response['status'] == 'okay':
return User(response['email'])
return User(None)
@login_manager.user_loader
def load_user(email):
"""Create user from already authenticated email"""
return User(email)
@login_manager.unauthorized_handler
def unauthorized():
return render_template('index.html')
@app.before_first_request
def initialize_jobs():
pass
@app.teardown_appcontext
def close_db(error):
"""Closes the database again at the end of the request."""
pass
task_node_max_price = { 'm3.xlarge':0.28, 'm3.2xlarge':0.56,'m1.large':0.175,'m1.medium':0.087,'c3.xlarge':0.210}
def summarizeSpot(prices,mtype=None):
x = pctfunction(prices)
if x> task_node_max_price[mtype]:
return(task_node_max_price[mtype])
else:
return(x)
import datetime
cached_price_history={}
def getSpotPriceHistoryFor(conn, mtype, numdays,detail=False):
timenow = datetime.datetime.now()
def recompPrice(numdays1):
tn = timenow - datetime.timedelta(days=numdays1)
tn = tn.strftime("%Y-%m-%dT%H:%M:%S.000Z")
h= conn.get_spot_price_history(start_time=tn, product_description="Linux/UNIX (Amazon VPC)",instance_type=mtype)
cached_price_history[ (mtype, numdays1)] = [l.price for l in h]
cached=True
#print( cached_price_history.get( (mtype, numdays),"FOOOF:F"))
if (timenow - start_time).seconds > 60 or cached_price_history.get( (mtype, numdays),None) == None:
## do a query and repopulate the cache
cached = False
recompPrice(numdays)
print({'mtype':mtype, 'num':numdays,'cached':cached,'prices':cached_price_history[ (mtype, numdays) ]})
return {'cached':cached,'prices':cached_price_history[ (mtype, numdays) ]}
def get_clusters_for_user(conn,username):
def doesThisClusterCorrespondToUser(c):
## will fail if the user chooses to delete their tags ...
tags = c.tags
if len(tags) == 0:
return False
yes=False
for l in tags:
if l.key == "user" and (l.value==username or l.value == "metricsall"):
yes=True
break
return yes
def get_node_info_for_cluster(anid):
instancegroups=conn.list_instance_groups(anid)
core,taskondmd,taskdmdwant,spotrun,spotwant = 0,0,0,0,0
if instancegroups is None:
return 0,0
for ig in instancegroups.instancegroups:
if ig.instancegrouptype == 'CORE':
core = core +int( ig.runninginstancecount)
elif ig.instancegrouptype == 'TASK' and ig.market == "ON_DEMAND":
taskondmd = taskondmd + int(ig.runninginstancecount)
taskdmdwant = taskdmdwant + int(ig.requestedinstancecount)
elif ig.instancegrouptype == 'TASK' and ig.market == "SPOT":
spotrun = spotrun + int(ig.runninginstancecount)
spotwant = spotwant+int(ig.requestedinstancecount)
return core,"%s[rqstd: %s]" %(taskondmd,taskdmdwant),"%s[rqstd: %s]" % (spotrun,spotwant)
def getIP(cc):
try:
return(clusdec.masterpublicdnsname)
except:
return "..."
def getReadyTime(cc):
try:
date_format='%m/%d/%Y %H:%M:%S %Z'
redyTime = str(clusdec.status.timeline.readydatetime)
try:
import time,pytz, calendar
redyTime = time.strptime(redyTime[:-5],"%Y-%m-%dT%H:%M:%S")
return(datetime.datetime.fromtimestamp(calendar.timegm(redyTime), tz=pytz.timezone("US/Pacific")).strftime("%Y-%m-%d %H:%M") + " PDT")
except:
return(redyTime)
except:
return "..."
try:
list_of_clusters = conn.list_clusters(cluster_states=["TERMINATING","BOOTSTRAPPING","WAITING","RUNNING","STARTING"])
toreturn = []
if list_of_clusters is not None:
cluster_list = list_of_clusters.clusters
i = 0
for acluster in cluster_list:
clusdec = conn.describe_cluster(acluster.id)
if doesThisClusterCorrespondToUser(clusdec):
i = i+1
en = { 'index':str(i), 'id' : clusdec.id, 'name' : clusdec.name, 'instancehrs': getattr(clusdec,'normalizedinstancehours',0),
'ip' : getIP(clusdec), 'state' : clusdec.status.state, 'ready' :getReadyTime(clusdec)}
en['corenodes'],en['tasknodesondmd'],en['tasknodesspot'] = get_node_info_for_cluster(clusdec.id)
toreturn.append(en)
return False,toreturn
except EmrResponseError,e:
import traceback
traceback.print_tb(e)
return True, e
spotcache = {}
@app.route("/spotprices",methods=["GET"])
def spotprices():
timenow=datetime.datetime.now()
def recompPrice(ndays,mtype):
if (timenow - start_time).seconds > 60 or spotcache.get( (mtype, ndays),None) == None:
tn = timenow - datetime.timedelta(days=ndays)
tn = tn.strftime("%Y-%m-%dT%H:%M:%S.000Z")
h= ec2boto.get_spot_price_history(start_time=tn, product_description="Linux/UNIX (Amazon VPC)",instance_type=mtype)
spotcache[ (mtype, numdays)] = [ { 'price':l.price, 'time':l.timestamp} for l in h]
return {'cached': False, 'prices': spotcache[ (mtype,numdays) ]}
else:
return {'cached': True, 'prices' : spotcache[ (mtype,numdays) ]}
formdata = request.args
formdict = formdata.to_dict()
print(formdict)
nodetype = formdict.get("type",app.config['MASTER_INSTANCE_TYPE'])
numdays = int(formdict.get("numdays", 3))
print(numdays)
priceInfo = recompPrice(numdays, nodetype)
priceInfo['node'] = nodetype
priceInfo['numdays'] = numdays
return jsonify(priceInfo)
@app.route("/modify_cluster", methods=["POST","GET"])
@login_required
def modify_cluster():
def adjustTG(c,numgr,thetype):
w1 = botoconn.list_instance_groups(c)
thegroup = None
for igr in w1.instancegroups:
if igr.instancegrouptype == "TASK" and igr.market == thetype and current_user.email in igr.name:
thegroup = igr
break
# raise
## modify if numgr >0 and no group, then create a new group with numgr
## if a group exists, modify it whatever the value of numgr
## if group doesn't exist and numgr == 0, errr, ignore this.
if thegroup == None and numgr>0:
if thetype == "SPOT":
priceInfo= getSpotPriceHistoryFor(ec2boto, app.config['TASK_NODE_TYPE'], app.config['SPOT_DAY_HIST'])
sp = str(round(summarizeSpot(priceInfo['prices'],app.config['TASK_NODE_TYPE']),3))
thegroup = InstanceGroup(numgr, "TASK",app.config['TASK_NODE_TYPE'],"SPOT","task spot "+current_user.email,sp)
else:
thegroup = InstanceGroup(numgr, "TASK",app.config['TASK_NODE_TYPE'],"ON_DEMAND","task dmd "+current_user.email)
botoconn.add_instance_groups(c,[thegroup])
elif not thegroup == None:
botoconn.modify_instance_groups( [thegroup.id], [numgr])
if not current_user.is_authorized():
return login_manager.unauthorized()
formdata = request.form
## operations include num of spots, clusid and whether to kill
clusops = {}
formdict = formdata.to_dict()
for key,value in formdict.iteritems():
if key.startswith("clus"):
clusops[ value ] = {}
for key in clusops:
clusops[ key ]['kill'] = formdict.get(key+"-kill","off")
## -1 means no change ...
clusops[ key ]['newspot'] = int(formdict.get(key+"-spot")) if formdict.get(key+"-spot") not in (None,'','donothing') else -1
clusops[ key ]['newdmd'] = int(formdict.get(key+"-ondmd")) if formdict.get(key+"-ondmd") not in (None,'','donothing') else -1
# import pdb; pdb.set_trace()
for key,value in clusops.iteritems():
if value['kill'] == 'on':
## terminate job
botoconn.terminate_jobflow(key)
else:
if int(value['newspot']) >= 0:
## spots are either 0 or >0, in any case run a modification
adjustTG(key,int(value['newspot']),'SPOT')
if int(value['newdmd']) >= 0:
## spots are either 0 or >0, in any case run a modification
adjustTG(key,int(value['newdmd']),'ON_DEMAND')
return back.redirect()
def checkIfUserSubmittedKey():
res = False
content = None
try:
f = open(os.path.join(__location__, current_user.email+".datjson"));
res=True
import json
content = json.load(f)['pubkey']
except IOError:
pass
return( res,content)
user_clusters = {}
@app.route('/', methods=["GET"])
@back.anchor
def index():
from datetime import datetime
from pytz import timezone
import pytz
date_format='%m/%d/%Y %H:%M:%S %Z'
priceInfo = getSpotPriceHistoryFor(ec2boto, app.config['TASK_NODE_TYPE'], app.config['SPOT_DAY_HIST'])
spotForM1Large = summarizeSpot(priceInfo['prices'],app.config['TASK_NODE_TYPE'])
if current_user.is_authorized():
## we need to get a list of clusters and display them
getnew=False
if current_user.email in user_clusters:
err = user_clusters[current_user.email]['err']
ls = user_clusters[current_user.email]['ls']
last = user_clusters[current_user.email]['last']
timecheck = user_clusters[current_user.email]['timecheck']
if int(time.time()) - last > app.config['REFRESH_TIME'] or current_user.email=='sguha@mozilla.com':
getnew=True
else:
getnew=True
if getnew:
timecheck=datetime.now(tz=timezone("US/Pacific")).strftime(date_format)
err, ls = get_clusters_for_user(botoconn, current_user.email)
user_clusters[current_user.email]={'last':int(time.time()), 'err':err, 'ls':ls,'timecheck':timecheck}
if err:
lsstr = str(ls)
else:
lsstr = ""
userhaskey,content = checkIfUserSubmittedKey()
return render_template('index.html', err=err, clusters=ls,message=lsstr
,userhaskey=userhaskey, pubkey = content
,timecheck="Last checked at "+timecheck,spot = "$"+str(spotForM1Large))
else:
return render_template('index.html')
def writeKeyIfNotAlready(akey):
if akey == "":
## this can only be missing, if a key is already present
return
## user provided a key
import json
try:
f = open(os.path.join(__location__, current_user.email+".datjson"));
content = json.load(f)
except IOError:
## most likely first time,
content = {}
content['pubkey'] = akey
with open(os.path.join(__location__, current_user.email+".datjson"), 'w') as outfile:
json.dump(content, outfile,sort_keys = True, indent = 4,ensure_ascii=False)
return
@app.route("/status", methods=["GET"])
def status():
return "OK"
numclusters={}
@app.route("/new_cluster", methods=["POST"])
def newcluster():
def is_instance_type_okay_fordisks():
def isthisOkay(t):
if t == 'm3.xlarge' or t=="m3.2xlarge" or t=="c3.2xlarge" or t=="c3.4xlarge":
return True
else:
return False
if isthisOkay(app.config['MASTER_INSTANCE_TYPE']) and isthisOkay(app.config['CORE_INSTANCE_TYPE']) and isthisOkay(app.config['TASK_NODE_TYPE']):
return True
else:
return False
formdata = request.form
formdict = formdata.to_dict()
pubkey = request.files['public-ssh-key'].read()
writeKeyIfNotAlready(pubkey)
numnodes = formdict['numnodes']
desc = formdict['clusdesc']
if desc=='':
i= numclusters.get(current_user.email,0)
desc = "Cluster #"+str(i)+" for "+current_user.email
if numnodes == '':
numnodes = app.config['DEFAULT_CORE_NODES']
else:
numnodes = int(numnodes)
## see https://anujjaiswal.wordpress.com/2015/02/10/aws-emr-high-performance-bootstrap-action/
hadoop_config_options = ["-m","mapred.map.child.java.opts=-Xmx1024m",
"-m","mapred.reduce.child.java.opts=-Xmx1024m",
"-y",
"yarn.resourcemanager.scheduler.class=org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"]
hadoop_config_options=hadoop_config_options+["-y", "yarn.log-aggregation-enable=true",
"-y", "yarn.log-aggregation.retain-seconds=-1",
"-y", "yarn.log-aggregation.retain-check-interval-seconds=3000",
"-y", "yarn.nodemanager.remote-app-log-dir=s3://mozillametricsemrscripts/aggreglogs"]
hadoop_config_options=hadoop_config_options+[ "-m", "mapreduce.map.output.compress=true",
# "-m", "mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec",
"-m", "mapreduce.output.fileoutputformat.compress=true"
# "-m", "mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec"
]
hadoop_config_options=hadoop_config_options+[ "-c", "fs.s3n.multipart.uploads.enabled=true",
"-c", "fs.s3n.multipart.uploads.split.size=524288000"]
if is_instance_type_okay_fordisks():
hadoop_config_options=hadoop_config_options+[
"-m", "mapreduce.map.memory.mb=4096",
"-m", "mapreduce.map.java.opts=-Xmx4096m"
# "-m", "mapreduce.map.java.opts=-XX:-UseGCOverheadLimit",
# "-m", "mapred.local.dir=\"/mnt/var/lib/hadoop/mapred,/mnt1/var/lib/hadoop/mapred\"",
# "-h", "dfs.data.dir=/mnt/var/lib/hadoop/dfs,/mnt1/var/lib/hadoop/dfs",
# "-h", "dfs.name.dir=/mnt/var/lib/hadoop/dfs-name,/mnt1/var/lib/hadoop/dfs-name"
# "-c", "hadoop.tmp.dir=/mnt/var/lib/hadoop/tmp,/mnt1/var/lib/hadoop/tmp",
# "-c", "fs.s3.buffer.dir='/mnt/var/lib/hadoop/s3,/mnt1/var/lib/hadoop/s3'",
# "-y", "yarn.nodemanager.local-dirs='/mnt/var/lib/hadoop/tmp/nm-local-dir,/mnt1/var/lib/hadoop/tmp/nm-local-dir'"
]
setup_hadoop_boostrap = BootstrapAction('Configure Hadoop',
's3://elasticmapreduce/bootstrap-actions/configure-hadoop',
hadoop_config_options)
setup_rhipekickstart_bootstrap = BootstrapAction('KickStart Rhipe',
's3://mozillametricsemrscripts/kickstartrhipe.sh',
['--public-key', pubkey,
'--timeout', app.config['CLUSTER_LIFE_MIN']])
finalStep = JarStep(name = 'Finalize HDFS'
,jar = 's3://elasticmapreduce/libs/script-runner/script-runner.jar'
,step_args = ['s3://mozillametricsemrscripts/final.step.sh'])
jobid = botoconn.run_jobflow(name = desc,
ec2_keyname = 'sguhaMozillaEast',
log_uri = "s3://mozillametricsemrscripts/logs",
enable_debugging = True,
master_instance_type = app.config['MASTER_INSTANCE_TYPE'],
slave_instance_type = app.config['CORE_INSTANCE_TYPE'],
num_instances = numnodes+1,
ami_version = app.config['AMI_VERSION'],
visible_to_all_users = True,
keep_alive = True,
bootstrap_actions = [setup_hadoop_boostrap, setup_rhipekickstart_bootstrap ],
steps = [finalStep]
)
botoconn.add_tags(jobid, {
"user": current_user.email,
})
numclusters[current_user.email] = numclusters.get(current_user.email,0)+1
return back.redirect()
@app.route('/graphs', methods=["GET"])
def graphs():
if current_user.is_authenticated():
print(current_user.is_authorized())
print(current_user.email)
print(dir(current_user))
return render_template('index.html')
login_manager.init_app(app)
browser_id.init_app(app)
if __name__ == '__main__':
parser = ArgumentParser(description='Tickle the Mozilla Monster')
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", default=8081, type=int)
parser.add_argument("--db-url", default='sqlite:///mozmonster.db')
args = parser.parse_args()
app.config.update(dict(
DB_URL = args.db_url,
DEBUG = True
))
app.run(host = args.host, port = args.port, debug=app.config['DEBUG'])