зеркало из https://github.com/mozilla/stoneridge.git
Merge branch 'master' into debugging_fixes
Conflicts: srworker.py
This commit is contained in:
Коммит
365bc52031
|
@ -16,11 +16,11 @@ class StoneRidgeMaster(stoneridge.QueueListener):
|
|||
def setup(self):
|
||||
self.queues = {
|
||||
'broadband': stoneridge.QueueWriter(
|
||||
stoneridge.NETCONFIG_QUEUES['broadband']['incoming']),
|
||||
stoneridge.NETCONFIG_QUEUES['broadband']),
|
||||
'umts': stoneridge.QueueWriter(
|
||||
stoneridge.NETCONFIG_QUEUES['umts']['incoming']),
|
||||
stoneridge.NETCONFIG_QUEUES['umts']),
|
||||
'gsm': stoneridge.QueueWriter(
|
||||
stoneridge.NETCONFIG_QUEUES['gsm']['incoming'])
|
||||
stoneridge.NETCONFIG_QUEUES['gsm'])
|
||||
}
|
||||
self.logdir = stoneridge.get_config('stoneridge', 'logs')
|
||||
self.config = stoneridge.get_config_file()
|
||||
|
|
|
@ -4,12 +4,9 @@
|
|||
# obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
import glob
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import stoneridge
|
||||
|
||||
|
@ -111,8 +108,9 @@ class StoneRidgeRunner(object):
|
|||
logging.debug('Not running processes: in unit test mode')
|
||||
else:
|
||||
if tcpdump_exe and tcpdump_if:
|
||||
tcpdump = subprocess.Popen([tcpdump_exe, '-s', '2000', '-w',
|
||||
tcpdump_output, '-i', tcpdump_if],
|
||||
tcpdump = subprocess.Popen([tcpdump_exe, '-s', '2000', '-U',
|
||||
'-p', '-w', tcpdump_output,
|
||||
'-i', tcpdump_if],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
res, xpcshell_out = stoneridge.run_xpcshell(args)
|
||||
|
|
|
@ -9,17 +9,13 @@ import stoneridge
|
|||
|
||||
|
||||
class StoneRidgeScheduler(stoneridge.QueueListener):
|
||||
def setup(self, rpc_queue, netconfig):
|
||||
self.rpc_queue = rpc_queue
|
||||
def setup(self, netconfig):
|
||||
self.netconfig = netconfig
|
||||
|
||||
self.runners = {
|
||||
'linux': stoneridge.RpcCaller(stoneridge.CLIENT_QUEUES['linux'],
|
||||
self.rpc_queue),
|
||||
'mac': stoneridge.RpcCaller(stoneridge.CLIENT_QUEUES['mac'],
|
||||
self.rpc_queue),
|
||||
'windows': stoneridge.RpcCaller(stoneridge.CLIENT_QUEUES['windows'],
|
||||
self.rpc_queue)
|
||||
'linux': stoneridge.QueueWriter(stoneridge.CLIENT_QUEUES['linux'])
|
||||
'mac': stoneridge.QueueWriter(stoneridge.CLIENT_QUEUES['mac'])
|
||||
'windows': stoneridge.QueueWriter(stoneridge.CLIENT_QUEUES['windows'])
|
||||
}
|
||||
|
||||
def handle(self, srid, operating_systems, tstamp):
|
||||
|
@ -30,19 +26,11 @@ class StoneRidgeScheduler(stoneridge.QueueListener):
|
|||
continue
|
||||
|
||||
logging.debug('Calling to run %s on %s' % (srid, o))
|
||||
res = runner(srid=srid, netconfig=self.netconfig, tstamp=tstamp)
|
||||
|
||||
if res['ok']:
|
||||
logging.debug('Run of %s on %s succeeded' % (srid, o))
|
||||
else:
|
||||
logging.error('Run of %s on %s failed: %s' % (srid, o,
|
||||
res['msg']))
|
||||
runner.enqueue(srid=srid, netconfig=self.netconfig, tstamp=tstamp)
|
||||
|
||||
|
||||
def daemon(netconfig):
|
||||
queues = stoneridge.NETCONFIG_QUEUES[netconfig]
|
||||
|
||||
scheduler = StoneRidgeScheduler(queues['incoming'], rpc_queue=queues['rpc'],
|
||||
scheduler = StoneRidgeScheduler(stoneridge.NETCONFIG_QUEUES[netconfig],
|
||||
netconfig=netconfig)
|
||||
scheduler.run()
|
||||
|
||||
|
|
18
srworker.py
18
srworker.py
|
@ -6,7 +6,6 @@
|
|||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import stoneridge
|
||||
|
@ -58,6 +57,7 @@ class StoneRidgeWorker(stoneridge.RpcHandler):
|
|||
info = os.path.join(srout, 'info.json')
|
||||
|
||||
self.srnetconfig = netconfig
|
||||
self.uploaded = False
|
||||
self.archive_on_failure = True
|
||||
self.procno = 1
|
||||
self.childlog = None
|
||||
|
@ -76,27 +76,23 @@ class StoneRidgeWorker(stoneridge.RpcHandler):
|
|||
f.write('srid = %s\n' % (srid,))
|
||||
|
||||
self.logger.debug('srnetconfig: %s' % (self.srnetconfig,))
|
||||
self.logger.debug('uploaded: %s' % (self.uploaded,))
|
||||
self.logger.debug('archive on failure: %s' % (self.archive_on_failure,))
|
||||
self.logger.debug('procno: %s' % (self.procno,))
|
||||
self.logger.debug('childlog: %s' % (self.childlog,))
|
||||
self.logger.debug('logdir: %s' % (self.logdir,))
|
||||
self.logger.debug('runconfig: %s' % (self.runconfig,))
|
||||
|
||||
res = {'ok': True}
|
||||
|
||||
try:
|
||||
self.run_test()
|
||||
except StoneRidgeException as e:
|
||||
self.logger.exception(e)
|
||||
res['ok'] = False
|
||||
res['msg'] = str(e)
|
||||
|
||||
self.reset()
|
||||
|
||||
return res
|
||||
|
||||
def reset(self):
|
||||
self.srnetconfig = None
|
||||
self.uploaded = False
|
||||
self.archive_on_failure = True
|
||||
self.procno = -1
|
||||
self.childlog = None
|
||||
|
@ -149,6 +145,12 @@ class StoneRidgeWorker(stoneridge.RpcHandler):
|
|||
self.run_process('archiver')
|
||||
except StoneRidgeException as e:
|
||||
pass
|
||||
if not self.uploaded:
|
||||
self.uploaded = True
|
||||
try:
|
||||
self.run_process('uploader')
|
||||
except StoneRidgeException:
|
||||
pass
|
||||
|
||||
# Finally, bubble the error up to the top level
|
||||
self.do_error(stage)
|
||||
|
@ -168,6 +170,8 @@ class StoneRidgeWorker(stoneridge.RpcHandler):
|
|||
|
||||
self.run_process('collator')
|
||||
|
||||
self.uploaded = True
|
||||
|
||||
self.run_process('uploader')
|
||||
|
||||
self.archive_on_failure = False
|
||||
|
|
|
@ -32,9 +32,9 @@ OPERATING_SYSTEMS = ('linux', 'mac', 'windows')
|
|||
INCOMING_QUEUE = 'sr_incoming'
|
||||
OUTGOING_QUEUE = 'sr_outgoing'
|
||||
NETCONFIG_QUEUES = {
|
||||
'broadband': {'incoming': 'sr_nc_broadband', 'rpc': 'sr_nc_broadband_rpc'},
|
||||
'umts': {'incoming': 'sr_nc_umts', 'rpc': 'sr_nc_umts_rpc'},
|
||||
'gsm': {'incoming': 'sr_nc_gsm', 'rpc': 'sr_nc_gsm_rpc'}
|
||||
'broadband': 'sr_nc_broadband',
|
||||
'umts': 'sr_nc_umts',
|
||||
'gsm': 'sr_nc_gsm'
|
||||
}
|
||||
CLIENT_QUEUES = {
|
||||
'linux': 'sr_ct_linux',
|
||||
|
@ -570,94 +570,3 @@ def enqueue(nightly=True, ldap='', sha='', netconfigs=None,
|
|||
writer = QueueWriter(INCOMING_QUEUE)
|
||||
writer.enqueue(nightly=nightly, ldap=ldap, sha=sha, netconfigs=netconfigs,
|
||||
operating_systems=operating_systems, srid=srid, attempt=attempt)
|
||||
|
||||
|
||||
class RpcCaller(object):
|
||||
"""Used to call remote functions via the stone ridge mq of choice.
|
||||
"""
|
||||
def __init__(self, outgoing_queue, incoming_queue):
|
||||
self._host = get_config('stoneridge', 'mqhost')
|
||||
self._outgoing_queue = outgoing_queue
|
||||
self._incoming_queue = incoming_queue
|
||||
|
||||
def _on_rpc_done(self, channel, method, properties, body):
|
||||
"""The callback that is called when the remote function call
|
||||
is complete.
|
||||
"""
|
||||
logging.debug('RpcCaller got callback')
|
||||
logging.debug('Body: %s' % (body,))
|
||||
logging.debug('Correlation id: %s' % (properties.correlation_id,))
|
||||
if self._srid == properties.correlation_id:
|
||||
logging.debug('Correlation ID matches.')
|
||||
self._response = body
|
||||
else:
|
||||
logging.debug('No match for correlation ID. Ignoring.')
|
||||
|
||||
def __call__(self, **msg):
|
||||
if 'srid' not in msg:
|
||||
logging.error('Attempted to make an RPC call without an srid!')
|
||||
return None
|
||||
|
||||
self._response = None
|
||||
self._srid = msg['srid']
|
||||
|
||||
logging.debug('Making RPC call with correlation id %s' % (self._srid,))
|
||||
logging.debug('Sending to: %s' % (self._outgoing_queue,))
|
||||
logging.debug('Reply to: %s' % (self._incoming_queue,))
|
||||
|
||||
params = pika.ConnectionParameters(host=self._host)
|
||||
connection = pika.BlockingConnection(params)
|
||||
channel = connection.channel()
|
||||
|
||||
# Send out our RPC request.
|
||||
properties = pika.BasicProperties(reply_to=self._incoming_queue,
|
||||
correlation_id=self._srid)
|
||||
body = json.dumps(msg)
|
||||
channel.basic_publish(exchange='',
|
||||
routing_key=self._outgoing_queue, body=body,
|
||||
properties=properties)
|
||||
|
||||
# Now start waiting on an answer from the RPC handler.
|
||||
channel.basic_consume(self._on_rpc_done, no_ack=True,
|
||||
queue=self._incoming_queue)
|
||||
|
||||
while self._response is None:
|
||||
connection.process_data_events()
|
||||
|
||||
# Got our response. We no longer need this connection, so get rid
|
||||
# of it (save the file descriptors!).
|
||||
connection.close()
|
||||
|
||||
self._srid = None
|
||||
|
||||
return json.loads(self._response)
|
||||
|
||||
|
||||
class RpcHandler(QueueListener):
|
||||
"""Like stoneridge.QueueListener, but for programs that service RPC instead
|
||||
of asynchronous queue events.
|
||||
"""
|
||||
def handle(self, **kwargs):
|
||||
"""Just like stoneridge.QueueListener.handle, except the return value
|
||||
from this must be serializable as a JSON string.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _handle(self, channel, method, properties, body):
|
||||
"""Internal message callback to perform the RPC and return the result
|
||||
to the caller.
|
||||
"""
|
||||
msg = json.loads(body)
|
||||
logging.debug('RPC Handler got message %s' % (msg,))
|
||||
res = self.handle(**msg)
|
||||
|
||||
body = json.dumps(res)
|
||||
logging.debug('Returning RPC result %s' % (res,))
|
||||
logging.debug('Returning to %s' % (properties.reply_to,))
|
||||
logging.debug('Returning for %s' % (properties.correlation_id,))
|
||||
res_properties = pika.BasicProperties(
|
||||
correlation_id=properties.correlation_id)
|
||||
channel.basic_publish(exchange='', routing_key=properties.reply_to,
|
||||
properties=res_properties, body=body)
|
||||
|
||||
channel.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
|
Загрузка…
Ссылка в новой задаче