From 4db19e8916f2d77fff9039c7581caba7287bd818 Mon Sep 17 00:00:00 2001 From: Nick Hurley Date: Tue, 19 Feb 2013 09:49:15 -0800 Subject: [PATCH 1/6] Get rid of some unused imports --- srrunner.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/srrunner.py b/srrunner.py index cfe0735..a1afb81 100644 --- a/srrunner.py +++ b/srrunner.py @@ -4,12 +4,9 @@ # obtain one at http://mozilla.org/MPL/2.0/. import glob -import json import logging import os -import platform import subprocess -import sys import stoneridge From 93a609577d8b469d778ca231f16fd577d8ada834 Mon Sep 17 00:00:00 2001 From: Nick Hurley Date: Tue, 19 Feb 2013 09:49:26 -0800 Subject: [PATCH 2/6] Use packet-buffering for tcpdump This should make sure we have all the packets from the test saved off, instead of possibly having some (or all) missing data. --- srrunner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/srrunner.py b/srrunner.py index a1afb81..1c6987c 100644 --- a/srrunner.py +++ b/srrunner.py @@ -118,8 +118,9 @@ class StoneRidgeRunner(object): logging.debug('Not running processes: in unit test mode') else: if tcpdump_exe and tcpdump_if: - tcpdump = subprocess.Popen([tcpdump_exe, '-s', '2000', '-w', - tcpdump_output, '-i', tcpdump_if], + tcpdump = subprocess.Popen([tcpdump_exe, '-s', '2000', '-U', + '-w', tcpdump_output, + '-i', tcpdump_if], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) res, xpcshell_out = stoneridge.run_xpcshell(args) From 24f5e49f607a90fbe28c8a2bc6ef00e529e4912d Mon Sep 17 00:00:00 2001 From: Nick Hurley Date: Tue, 19 Feb 2013 09:59:47 -0800 Subject: [PATCH 3/6] Eliminate unused import --- srworker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/srworker.py b/srworker.py index f7467e7..7b0a87b 100644 --- a/srworker.py +++ b/srworker.py @@ -6,7 +6,6 @@ import logging import os import subprocess -import sys import tempfile import stoneridge From 3d40994f5b7e1a6a62f2f620b9cb3d8da9501286 Mon Sep 17 00:00:00 2001 From: Nick Hurley Date: Tue, 19 Feb 2013 10:00:05 -0800 Subject: [PATCH 4/6] Always upload This way we will have info even about failures in one central place. --- srworker.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/srworker.py b/srworker.py index 7b0a87b..4b26760 100644 --- a/srworker.py +++ b/srworker.py @@ -62,6 +62,7 @@ class StoneRidgeWorker(stoneridge.RpcHandler): srxpcout = os.path.basename(tempfile.mktemp()) self.srnetconfig = netconfig + self.uploaded = False self.archive_on_failure = True self.cleaner_called = False self.procno = 1 @@ -82,6 +83,7 @@ class StoneRidgeWorker(stoneridge.RpcHandler): f.write('srid = %s\n' % (srid,)) self.logger.debug('srnetconfig: %s' % (self.srnetconfig,)) + self.logger.debug('uploaded: %s' % (self.uploaded,)) self.logger.debug('archive on failure: %s' % (self.archive_on_failure,)) self.logger.debug('cleaner called: %s' % (self.cleaner_called,)) self.logger.debug('procno: %s' % (self.procno,)) @@ -104,6 +106,7 @@ class StoneRidgeWorker(stoneridge.RpcHandler): def reset(self): self.srnetconfig = None + self.uploaded = False self.archive_on_failure = True self.cleaner_called = True self.procno = -1 @@ -164,6 +167,12 @@ class StoneRidgeWorker(stoneridge.RpcHandler): self.run_process('cleaner') except StoneRidgeException as e: pass + if not self.uploaded: + self.uploaded = True + try: + self.run_process('uploader') + except StoneRidgeException: + pass # Finally, bubble the error up to the top level self.do_error(stage) @@ -183,6 +192,8 @@ class StoneRidgeWorker(stoneridge.RpcHandler): self.run_process('collator') + self.uploaded = True + self.run_process('uploader') self.archive_on_failure = False From e1921be09a92c58da5c7d9e5203c59c1456da7ee Mon Sep 17 00:00:00 2001 From: Nick Hurley Date: Tue, 19 Feb 2013 10:08:06 -0800 Subject: [PATCH 5/6] Forget RPC for now For some reason, things get hung up returning the response, with no debugging info available. This is crap, and the RPC is less important for the first push (though I still want to find a way to do it) --- srmaster.py | 6 ++-- srscheduler.py | 24 ++++--------- srworker.py | 6 ---- stoneridge.py | 97 ++------------------------------------------------ 4 files changed, 12 insertions(+), 121 deletions(-) diff --git a/srmaster.py b/srmaster.py index f0227ae..370c852 100644 --- a/srmaster.py +++ b/srmaster.py @@ -16,11 +16,11 @@ class StoneRidgeMaster(stoneridge.QueueListener): def setup(self): self.queues = { 'broadband': stoneridge.QueueWriter( - stoneridge.NETCONFIG_QUEUES['broadband']['incoming']), + stoneridge.NETCONFIG_QUEUES['broadband']), 'umts': stoneridge.QueueWriter( - stoneridge.NETCONFIG_QUEUES['umts']['incoming']), + stoneridge.NETCONFIG_QUEUES['umts']), 'gsm': stoneridge.QueueWriter( - stoneridge.NETCONFIG_QUEUES['gsm']['incoming']) + stoneridge.NETCONFIG_QUEUES['gsm']) } self.logdir = stoneridge.get_config('stoneridge', 'logs') self.config = stoneridge.get_config_file() diff --git a/srscheduler.py b/srscheduler.py index fdab9bd..052a220 100644 --- a/srscheduler.py +++ b/srscheduler.py @@ -9,17 +9,13 @@ import stoneridge class StoneRidgeScheduler(stoneridge.QueueListener): - def setup(self, rpc_queue, netconfig): - self.rpc_queue = rpc_queue + def setup(self, netconfig): self.netconfig = netconfig self.runners = { - 'linux': stoneridge.RpcCaller(stoneridge.CLIENT_QUEUES['linux'], - self.rpc_queue), - 'mac': stoneridge.RpcCaller(stoneridge.CLIENT_QUEUES['mac'], - self.rpc_queue), - 'windows': stoneridge.RpcCaller(stoneridge.CLIENT_QUEUES['windows'], - self.rpc_queue) + 'linux': stoneridge.QueueWriter(stoneridge.CLIENT_QUEUES['linux']) + 'mac': stoneridge.QueueWriter(stoneridge.CLIENT_QUEUES['mac']) + 'windows': stoneridge.QueueWriter(stoneridge.CLIENT_QUEUES['windows']) } def handle(self, srid, operating_systems, tstamp): @@ -30,19 +26,11 @@ class StoneRidgeScheduler(stoneridge.QueueListener): continue logging.debug('Calling to run %s on %s' % (srid, o)) - res = runner(srid=srid, netconfig=self.netconfig, tstamp=tstamp) - - if res['ok']: - logging.debug('Run of %s on %s succeeded' % (srid, o)) - else: - logging.error('Run of %s on %s failed: %s' % (srid, o, - res['msg'])) + runner.enqueue(srid=srid, netconfig=self.netconfig, tstamp=tstamp) def daemon(netconfig): - queues = stoneridge.NETCONFIG_QUEUES[netconfig] - - scheduler = StoneRidgeScheduler(queues['incoming'], rpc_queue=queues['rpc'], + scheduler = StoneRidgeScheduler(stoneridge.NETCONFIG_QUEUES[netconfig], netconfig=netconfig) scheduler.run() diff --git a/srworker.py b/srworker.py index 4b26760..8205568 100644 --- a/srworker.py +++ b/srworker.py @@ -91,19 +91,13 @@ class StoneRidgeWorker(stoneridge.RpcHandler): self.logger.debug('logdir: %s' % (self.logdir,)) self.logger.debug('runconfig: %s' % (self.runconfig,)) - res = {'ok': True} - try: self.run_test() except StoneRidgeException as e: self.logger.exception(e) - res['ok'] = False - res['msg'] = str(e) self.reset() - return res - def reset(self): self.srnetconfig = None self.uploaded = False diff --git a/stoneridge.py b/stoneridge.py index 7c6aed5..6878896 100644 --- a/stoneridge.py +++ b/stoneridge.py @@ -32,9 +32,9 @@ OPERATING_SYSTEMS = ('linux', 'mac', 'windows') INCOMING_QUEUE = 'sr_incoming' OUTGOING_QUEUE = 'sr_outgoing' NETCONFIG_QUEUES = { - 'broadband': {'incoming': 'sr_nc_broadband', 'rpc': 'sr_nc_broadband_rpc'}, - 'umts': {'incoming': 'sr_nc_umts', 'rpc': 'sr_nc_umts_rpc'}, - 'gsm': {'incoming': 'sr_nc_gsm', 'rpc': 'sr_nc_gsm_rpc'} + 'broadband': 'sr_nc_broadband', + 'umts': 'sr_nc_umts', + 'gsm': 'sr_nc_gsm' } CLIENT_QUEUES = { 'linux': 'sr_ct_linux', @@ -585,94 +585,3 @@ def enqueue(nightly=True, ldap='', sha='', netconfigs=None, writer = QueueWriter(INCOMING_QUEUE) writer.enqueue(nightly=nightly, ldap=ldap, sha=sha, netconfigs=netconfigs, operating_systems=operating_systems, srid=srid, attempt=attempt) - - -class RpcCaller(object): - """Used to call remote functions via the stone ridge mq of choice. - """ - def __init__(self, outgoing_queue, incoming_queue): - self._host = get_config('stoneridge', 'mqhost') - self._outgoing_queue = outgoing_queue - self._incoming_queue = incoming_queue - - def _on_rpc_done(self, channel, method, properties, body): - """The callback that is called when the remote function call - is complete. - """ - logging.debug('RpcCaller got callback') - logging.debug('Body: %s' % (body,)) - logging.debug('Correlation id: %s' % (properties.correlation_id,)) - if self._srid == properties.correlation_id: - logging.debug('Correlation ID matches.') - self._response = body - else: - logging.debug('No match for correlation ID. Ignoring.') - - def __call__(self, **msg): - if 'srid' not in msg: - logging.error('Attempted to make an RPC call without an srid!') - return None - - self._response = None - self._srid = msg['srid'] - - logging.debug('Making RPC call with correlation id %s' % (self._srid,)) - logging.debug('Sending to: %s' % (self._outgoing_queue,)) - logging.debug('Reply to: %s' % (self._incoming_queue,)) - - params = pika.ConnectionParameters(host=self._host) - connection = pika.BlockingConnection(params) - channel = connection.channel() - - # Send out our RPC request. - properties = pika.BasicProperties(reply_to=self._incoming_queue, - correlation_id=self._srid) - body = json.dumps(msg) - channel.basic_publish(exchange='', - routing_key=self._outgoing_queue, body=body, - properties=properties) - - # Now start waiting on an answer from the RPC handler. - channel.basic_consume(self._on_rpc_done, no_ack=True, - queue=self._incoming_queue) - - while self._response is None: - connection.process_data_events() - - # Got our response. We no longer need this connection, so get rid - # of it (save the file descriptors!). - connection.close() - - self._srid = None - - return json.loads(self._response) - - -class RpcHandler(QueueListener): - """Like stoneridge.QueueListener, but for programs that service RPC instead - of asynchronous queue events. - """ - def handle(self, **kwargs): - """Just like stoneridge.QueueListener.handle, except the return value - from this must be serializable as a JSON string. - """ - raise NotImplementedError - - def _handle(self, channel, method, properties, body): - """Internal message callback to perform the RPC and return the result - to the caller. - """ - msg = json.loads(body) - logging.debug('RPC Handler got message %s' % (msg,)) - res = self.handle(**msg) - - body = json.dumps(res) - logging.debug('Returning RPC result %s' % (res,)) - logging.debug('Returning to %s' % (properties.reply_to,)) - logging.debug('Returning for %s' % (properties.correlation_id,)) - res_properties = pika.BasicProperties( - correlation_id=properties.correlation_id) - channel.basic_publish(exchange='', routing_key=properties.reply_to, - properties=res_properties, body=body) - - channel.basic_ack(delivery_tag=method.delivery_tag) From 3db5a11f30b09cc7091cd5062c89e0891a111342 Mon Sep 17 00:00:00 2001 From: Nick Hurley Date: Tue, 19 Feb 2013 10:09:29 -0800 Subject: [PATCH 6/6] Don't use promiscuous mode in tcpdump This is in an attempt to limit traffic to just our own stuff (which should work fine, since nothing else should be putting the interface in promiscuous mode). --- srrunner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srrunner.py b/srrunner.py index 1c6987c..bb84317 100644 --- a/srrunner.py +++ b/srrunner.py @@ -119,7 +119,7 @@ class StoneRidgeRunner(object): else: if tcpdump_exe and tcpdump_if: tcpdump = subprocess.Popen([tcpdump_exe, '-s', '2000', '-U', - '-w', tcpdump_output, + '-p', '-w', tcpdump_output, '-i', tcpdump_if], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)