Persist queue on disk for windows

This is a giant hack to make the windows client not lose the list of
tests it has to run whenever it needs rebooted, or a daemon there needs
restarted, or whatever (which happens pretty often). Instead of using
rabbitmq directly from windows (which has been a source of much pain),
we use a queue listener on the master to get windows things from
rabbitmq and put them into an sqlite database. Then, there's a web
service (still running on the master) that serves those up one at a
time. Finally, on windows, instead of running the worker as a daemon
that does its thing, we'll run it as a cron job (every 30 seconds or so,
just so we don't get too far behind) that calls out to the web service,
runs one test, and then shuts down. This should keep us from screwing
things up TOO badly.
This commit is contained in:
Nick Hurley 2013-02-26 14:15:40 -08:00
Родитель 3ff28fd30f
Коммит 490865eeb2
5 изменённых файлов: 167 добавлений и 0 удалений

Просмотреть файл

@ -80,6 +80,16 @@ keep = 50
# corresponds to 15 minutes
timeout = 900
[mqproxy]
# Where to keep the proxy database for windows queue entries
db = /Users/hurley/src/stoneridge/testroot/mqproxy.db
# What port to listen on for the server
port = 8888
# What URL the client should hit to get requests
url = http://127.0.0.1:8888/get_next
[machine]
# The type of os this is running on, may be mac, linux, or windows
os = mac

Просмотреть файл

@ -1,3 +1,4 @@
bottle==0.11.6
daemonize==1.3
datazilla==1.1
httplib2==0.7.7

38
windows/srmqproxy.py Normal file
Просмотреть файл

@ -0,0 +1,38 @@
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.
import json
import logging
import sqlite3
import stoneridge
class StoneRidgeMQProxy(stoneridge.QueueListener):
def setup(self):
self.dbfile = stoneridge.get_config('mqproxy', 'db')
self.conn = sqlite3.connect(self.dbfile)
def handle(self, **kwargs):
logging.debug('Got new windows queue entry: %s' % (kwargs,))
cursor = self.conn.cursor()
config = json.dumps(kwargs)
cursor.execute('INSERT INTO runs (config, done) VALUES (?, ?)',
(config, False))
self.conn.commit()
logging.debug('Inserted into persistent queue')
def daemon():
proxy = StoneRidgeMQProxy(stoneridge.CLIENT_QUEUES['windows'])
proxy.connrun()
@stoneridge.main
def main():
parser = stoneridge.DaemonArgumentParser()
parser.parse_args()
parser.start_daemon(daemon)

67
windows/srwebmq.py Normal file
Просмотреть файл

@ -0,0 +1,67 @@
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.
import bottle
import logging
import sqlite3
import sys
import stoneridge
conn = None # Persistent connection for sqlite file
class StreamLogger(object):
"""Redirect a stream to a logger
"""
def __init__(self, logger):
self.logger = logger
def write(self, buf):
for line in buf.rstrip().splitlines():
self.logger.log(logging.DEBUG, line.rstrip())
@bottle.route('/get_next')
def get_next():
cur = conn.cursor()
cur.execute('SELECT id, config FROM runs WHERE done = ? ORDER BY id ASC '
'LIMIT 1', (False,))
res = cur.fetchall()
if res:
id_, config = res[0]
logging.debug('Found entry %s' % (config,))
cur.execute('UPDATE runs SET done = ? WHERE id = ?', (True, id_))
conn.commit()
else:
logging.debug('No entries waiting')
config = ''
logging.debug('Returning %s' % (config,))
return config
def daemon():
global conn
dbfile = stoneridge.get_config('mqproxy', 'db')
conn = sqlite3.connect(dbfile)
# Do some nasty hackery to make sure everything bottle prints goes to our
# log, too
streamlogger = StreamLogger(logging.getLogger())
sys.stdout = sys.stderr = streamlogger
bottle._stdout = bottle._stderr = streamlogger.write
port = stoneridge.get_config_int('mqproxy', 'port')
bottle.run(host='0.0.0.0', port=port)
@stoneridge.main
def main():
parser = stoneridge.DaemonArgumentParser()
parser.parse_args()
parser.start_daemon(daemon)

51
windows/srwebworker.py Normal file
Просмотреть файл

@ -0,0 +1,51 @@
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.
import json
import logging
import requests
import srworker
import stoneridge
class StoneRidgeWebWorker(srworker.StoneRidgeWorker):
def __init__(self):
self.url = stoneridge.get_config('mqproxy', 'url')
self.setup()
def run(self):
res = requests.get(self.url)
if res.status_code != 200:
logging.error('Got non-200 response: %s %s (text %s)' %
(res.status_code, res.reason, res.text))
return
logging.debug('Got response %s' % (res.text,))
if not res.text:
logging.debug('No entries waiting!')
return
args = json.loads(res.text)
logging.debug('Handling request')
self.handle(**args)
logging.debug('Done')
@stoneridge.main
def main():
parser = stoneridge.ArgumentParser()
parser.parse_args()
worker = StoneRidgeWebWorker()
try:
worker.run()
except:
logging.exception('Error running this time')