stoneridge/stoneridge.py

425 строки
12 KiB
Python
Исходник Обычный вид История

# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.
2012-05-01 22:42:35 +04:00
import argparse
2012-04-18 03:39:13 +04:00
import ConfigParser
2012-06-20 01:38:32 +04:00
import copy
import inspect
import logging
2012-05-02 00:29:07 +04:00
import os
import platform
2012-05-01 22:42:35 +04:00
import subprocess
import sys
import traceback
import pika
# Network configurations we have available. Map internal/parameter name
# to descriptive name
2013-01-11 03:58:12 +04:00
NETCONFIGS = {
'broadband':'Modern Wired Broadband (Cable/ADSL)',
'umts':'Modern Cellular (UMTS)',
'gsm':'Legacy Cellular (GSM/EDGE)',
}
2012-05-01 22:42:35 +04:00
2013-01-05 03:52:03 +04:00
# RabbitMQ queue names
INCOMING_QUEUE = 'sr_incoming'
OUTGOING_QUEUE = 'sr_outgoing'
2013-01-10 02:00:12 +04:00
NETCONFIG_QUEUES = {
'broadband': {'incoming': 'sr_nc_broadband', 'rpc': 'sr_nc_broadband_rpc'},
'umts': {'incoming': 'sr_nc_umts', 'rpc': 'sr_nc_umts_rpc'},
'gsm': {'incoming': 'sr_nc_gsm', 'rpc': 'sr_nc_gsm_rpc'}
}
2013-01-10 02:48:20 +04:00
CLIENT_QUEUES = {
'linux': 'sr_ct_linux',
'mac': 'sr_ct_mac',
'windows': 'sr_ct_windows'
}
2013-01-05 03:52:03 +04:00
# Logging configuration
2013-01-11 00:54:41 +04:00
LOG_FMT = '%(asctime)s %(pathname)s:%(lineno)d %(levelname)s: %(message)s'
2012-11-01 21:52:48 +04:00
_parser = argparse.ArgumentParser()
_parser.add_argument('--log')
_args, _ = _parser.parse_known_args()
if _args.log:
_logger = logging.getLogger()
_logger.setLevel(logging.DEBUG)
_handler = logging.FileHandler(_args.log)
2013-01-11 00:54:41 +04:00
_formatter = logging.Formatter(fmt=LOG_FMT)
_handler.setFormatter(_formatter)
_logger.addHandler(_handler)
def log(msg):
if _args.log:
logging.debug(msg)
def log_exc(msg):
if _args.log:
logging.exception(msg)
def main(_main):
"""Mark a function as the main function to run when run as a script.
If that function throws an exception, we'll print the traceback to
stderr and exit.
"""
parent = inspect.stack()[1][0]
name = parent.f_locals.get('__name__', None)
if name == '__main__':
log('%s' % (' '.join(sys.argv),))
try:
2012-05-01 22:42:35 +04:00
_main()
except Exception, e:
log_exc('EXCEPTION')
2012-05-01 22:42:35 +04:00
traceback.print_exception(type(e), e, sys.exc_info()[2], None,
sys.stderr)
sys.exit(1)
log('FINISHED')
sys.exit(0)
return _main
_cp = None
_srconf = None
_runconf = None
2012-07-24 21:17:21 +04:00
def get_config(section, option, default=None):
"""Read a config entry from the stoneridge ini files.
"""
global _cp
2012-11-02 01:15:28 +04:00
logging.debug('reading %s.%s (default %s)' % (section, option, default))
if _cp is None:
_cp = ConfigParser.SafeConfigParser()
if _srconf:
logging.debug('loading stoneridge config file %s' % (_srconf,))
_cp.read(_srconf)
if _runconf:
logging.debug('loading run config file %s' % (_runconf,))
_cp.read(_runconf)
2012-04-18 03:39:13 +04:00
try:
2012-11-02 01:15:28 +04:00
val = _cp.get(section, option)
logging.debug('found %s.%s, returning %s' % (section, option, val))
return val
except ConfigParser.NoSectionError, ConfigParser.NoOptionError as e:
2012-11-02 01:15:28 +04:00
logging.debug('unable to find %s.%s, returning default %s' %
(section, option, default))
2012-07-24 21:17:21 +04:00
return default
_xpcshell_environ = None
def run_xpcshell(args, stdout=subprocess.PIPE):
"""Run xpcshell with the appropriate args.
"""
2012-06-20 01:38:32 +04:00
global _xpcshell_environ
2013-01-11 03:58:12 +04:00
bindir = get_config('run', 'bin')
if bindir is None:
return (None, None)
2012-06-20 01:38:32 +04:00
if _xpcshell_environ is None:
_xpcshell_environ = copy.copy(os.environ)
ldlibpath = _xpcshell_environ.get('LD_LIBRARY_PATH')
if ldlibpath:
ldlibpath = os.path.pathsep.join([bindir, ldlibpath])
else:
ldlibpath = bindir
_xpcshell_environ['LD_LIBRARY_PATH'] = ldlibpath
xpcargs = [xpcshell] + args
proc = subprocess.Popen(xpcargs, stdout=stdout,
2012-06-20 01:38:32 +04:00
stderr=subprocess.STDOUT, cwd=bindir,
env=_xpcshell_environ)
res = proc.wait()
return (res, proc.stdout)
_xpcoutdir = None
def get_xpcshell_output_directory():
"""Get the directory where xpcshell output will be placed.
"""
global _xpcoutdir
2012-05-10 08:04:31 +04:00
if _xpcoutdir is None:
xpcshell_tmp_dir = None
2012-05-02 01:14:10 +04:00
_, stdout = run_xpcshell(['-e',
'dump("SR-TMP-DIR:" + '
' Components.classes["@mozilla.org/file/directory_service;1"]'
' .getService(Components.interfaces.nsIProperties)'
' .get("TmpD", Components.interfaces.nsILocalFile)'
2012-05-10 22:23:39 +04:00
' .path + "\\n");'
2012-05-02 01:14:10 +04:00
'quit(0);'])
2012-05-02 01:14:10 +04:00
for line in stdout:
if line.startswith('SR-TMP-DIR:'):
xpcshell_tmp_dir = line.strip().split(':', 1)[1]
2012-05-02 01:14:10 +04:00
if xpcshell_tmp_dir is None:
# TODO - maybe raise exception?
return None
xpcoutleaf = get_config('run', 'xpcoutleaf')
_xpcoutdir = os.path.join(xpctmp, xpcoutleaf)
return _xpcoutdir
_os_version = None
def get_os_version():
"""Return the OS version in use.
"""
global _os_version
if _os_version is None:
os_name = get_config('machine', 'os')
if os_name == 'linux':
_os_version = ' '.join(platform.linux_distribution()[0:2])
elif os_name == 'mac':
_os_version = platform.mac_ver()[0]
elif os_name == 'windows':
_os_version = platform.win32_ver()[1]
else:
_os_version = 'Unknown'
return _os_version
_netconfig_ids = {
'broadband':'0',
'umts':'1',
'gsm':'2',
}
_os_ids = {
'windows':'w',
'linux':'l',
'mac':'m',
}
_buildid_suffix = None
def get_buildid_suffix():
"""Return the suffix to be used to uniquify the build id.
"""
global _buildid_suffix
2013-01-11 03:58:12 +04:00
os_name = get_config('machine', 'os')
current_netconfig = get_config('run', 'netconfig')
if _buildid_suffix is None:
_buildid_suffix = _os_ids[os_name] + _netconfig_ids[current_netconfig]
return _buildid_suffix
2013-01-10 02:48:20 +04:00
def run_process(*args, logger=logging):
"""Run a python process under the stoneridge environment.
"""
procname = args[0]
command = [sys.executable] + args
2013-01-10 02:48:20 +04:00
logger.debug('Running %s' % (procname,))
logger.debug(' '.join(command))
try:
proc_stdout = subprocess.check_output(command,
stderr=subprocess.STDOUT)
2013-01-10 02:48:20 +04:00
logger.debug(proc_stdout)
logger.debug('SUCCEEDED: %s' % (procname,))
except subprocess.CalledProcessError, e:
2013-01-10 02:48:20 +04:00
logger.error('FAILED: %s (%s)' % (procname, e.returncode))
logger.error(e.output)
raise # Do this in case caller has any special handling
2012-09-25 22:27:22 +04:00
class ArgumentParser(argparse.ArgumentParser):
"""An argument parser for stone ridge programs that handles the arguments
required by all of them.
"""
def __init__(self, **kwargs):
argparse.ArgumentParser.__init__(self, **kwargs)
self.add_argument('--config', dest='_sr_config_', required=True,
help='Configuration file')
self.add_argument('--log', dest='_sr_log_', default=None, required=True,
help='File to place log info in')
2012-05-02 00:29:07 +04:00
def parse_args(self, **kwargs):
global _srconf
args = argparse.ArgumentParser.parse_args(self, **kwargs)
_srconf = args._sr_config_
logging.debug('_srconf: %s' % (_srconf,))
return args
class TestRunArgumentParser(ArgumentParser):
"""Like stoneridge.ArgumentParser, but adds arguments specific for programs
that are run as part of a test run.
"""
def __init__(self, **kwargs):
ArgumentParser.__init__(self, **kwargs)
self.add_argument('--runconfig', dest='_sr_runconfig_', required=True,
help='Run-specific configuration file')
def parse_args(self, **kwargs):
global _runconf
args = ArgumentParser.parse_args(self, **kwargs)
_runconf = args._sr_runconfig_
logging.debug('_runconf: %s' % (_runconf,))
2012-09-25 22:27:22 +04:00
return args
class QueueListener(object):
"""A class to be used as the base for stone ridge daemons that need to
respond to entries on a queue.
"""
def __init__(self, host, queue, **kwargs):
self._host = host
self._queue = queue
self._params = pika.ConnectionParameters(host=host)
self._args = kwargs
self.setup(**kwargs)
def setup(self, **kwargs):
"""Used for class-specific things that would normally go in __init__.
"""
pass
2013-01-05 03:59:49 +04:00
def handle(self, **kwargs):
"""The callback that is called when a message is received on the queue.
All subclasses must override this. Nothing is done with the returned
value.
"""
raise NotImplementedError
def _handle(self, channel, method, properties, body):
"""Internal callback for when a message is received. Deserializes the
message and calls handle. Once handle succeeds, the message is
acknowledged.
"""
msg = json.loads(body)
self.handle(**msg)
channel.basic_ack(delivery_tag=method.delivery_tag)
def run(self):
"""Main event loop for a queue listener.
"""
if self._queue is None:
raise Exception('You must set queue for %s' % (type(self),))
connection = pika.BlockingConnection(self._params)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(self._handle, queue=self._queue)
channel.start_consuming()
class QueueWriter(object):
"""Used when someone needs to write to a stone ridge queue.
"""
def __init__(self, host, queue):
self._host = host
self._params = pika.ConnectionParameters(host=host)
self._queue = queue
def enqueue(self, **msg):
"""Place a message on the queue. The message is serialized as a JSON
string before being placed on the queue.
"""
connection = pika.BlockingConnection(self._params)
channel = connection.channel()
body = json.dumps(msg)
channel.basic_publish(exchange='', routing_key=self._queue, body=body,
properties=pika.BasicProperties(delivery_mode=2)) # Durable
connection.close() # Ensures the message is sent
2013-01-10 02:00:12 +04:00
2013-01-10 02:00:12 +04:00
class RpcCaller(object):
"""Used to call remote functions via the stone ridge mq of choice.
"""
2013-01-10 02:00:12 +04:00
def __init__(self, host, outgoing_queue, incoming_queue):
self._host = host
self._outgoing_queue = outgoing_queue
self._incoming_queue = incoming_queue
2013-01-10 02:00:12 +04:00
params = pika.ConnectionParameters(host=host)
self._connection = pika.BlockingConnection(params)
self._channel = self._connection.channel
self._channel.basic_consume(self._on_rpc_done, no_ack=True,
queue=self._incoming_queue)
2013-01-10 02:00:12 +04:00
def _on_rpc_done(self, channel, method, properties, body):
"""The callback that is called when the remote function call
is complete.
"""
if self._srid == properties.correlation_id:
self._response = body
2013-01-10 02:00:12 +04:00
def __call__(self, **msg):
if 'srid' not in msg:
logging.error('Attempted to make an RPC call without an srid!')
return None
self._response = None
self._srid = msg['srid']
2013-01-10 02:00:12 +04:00
properties = pika.BasicProperties(reply_to=self._incoming_queue,
correlation_id=self._srid)
2013-01-10 02:00:12 +04:00
body = json.dumps(msg)
self._channel.basic_publish(exchange='',
routing_key=self._outgoing_queue, body=body,
properties=properties)
2013-01-10 02:00:12 +04:00
while self._response is None:
self._connection.process_data_events()
2013-01-10 02:00:12 +04:00
return json.loads(self._response)
2013-01-10 02:48:20 +04:00
2013-01-10 02:48:20 +04:00
class RpcHandler(QueueListener):
"""Like stoneridge.QueueListener, but for programs that service RPC instead
of asynchronous queue events.
"""
def handle(self, **kwargs):
"""Just like stoneridge.QueueListener.handle, except the return value
from this must be serializable as a JSON string.
"""
raise NotImplementedError
2013-01-10 02:48:20 +04:00
def _handle(self, channel, method, properties, body):
"""Internal message callback to perform the RPC and return the result
to the caller.
"""
2013-01-10 02:48:20 +04:00
msg = json.loads(body)
res = self.handle(**msg)
body = json.dumps(res)
res_properties = pika.BasicProperties(
correlation_id=properties.correlation_id)
channel.basic_publish(exchange='', routing_key=properties.reply_to,
properties=res_properties, body=body)
channel.basic_ack(delivery_tag=method.delivery_tag)