Bug 724595 Merge mozbase changes from Dec 31 to Feb 1 r=ctalbert

This commit is contained in:
Mark Cote 2012-02-08 11:07:19 -08:00
Родитель 580e841242
Коммит 63c9637b10
18 изменённых файлов: 1098 добавлений и 214 удалений

Просмотреть файл

@ -35,5 +35,6 @@
#
# ***** END LICENSE BLOCK *****
from mozhttpd import MozHttpd, MozRequestHandler
from mozhttpd import MozHttpd, Request, RequestHandler, main
from handlers import json_response
import iface

Просмотреть файл

@ -0,0 +1,52 @@
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is templeton.
#
# The Initial Developer of the Original Code is
# the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Mark Cote <mcote@mozilla.com>
# William Lachance <wlachance@mozilla.com>
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
try:
import json
except ImportError:
import simplejson as json
def json_response(func):
""" Translates results of 'func' into a JSON response. """
def wrap(*a, **kw):
(code, data) = func(*a, **kw)
json_data = json.dumps(data)
return (code, { 'Content-type': 'application/json',
'Content-Length': len(json_data) }, json_data)
return wrap

Просмотреть файл

@ -50,7 +50,11 @@ def _get_interface_ip(ifname):
)[20:24])
def get_lan_ip():
ip = socket.gethostbyname(socket.gethostname())
try:
ip = socket.gethostbyname(socket.gethostname())
except socket.gaierror: # for Mac OS X
ip = socket.gethostbyname(socket.gethostname() + ".local")
if ip.startswith("127.") and os.name != "nt":
interfaces = ["eth0", "eth1", "eth2", "wlan0", "wlan1", "wifi0", "ath0", "ath1", "ppp0"]
for ifname in interfaces:

Просмотреть файл

@ -39,33 +39,137 @@
import BaseHTTPServer
import SimpleHTTPServer
import errno
import logging
import threading
import posixpath
import socket
import sys
import os
import urllib
import urlparse
import re
from SocketServer import ThreadingMixIn
class EasyServer(ThreadingMixIn, BaseHTTPServer.HTTPServer):
allow_reuse_address = True
acceptable_errors = (errno.EPIPE, errno.ECONNABORTED)
def handle_error(self, request, client_address):
error = sys.exc_value
if ((isinstance(error, socket.error) and
isinstance(error.args, tuple) and
error.args[0] in self.acceptable_errors)
or
(isinstance(error, IOError) and
error.errno in self.acceptable_errors)):
pass # remote hang up before the result is sent
else:
logging.error(error)
class Request(object):
"""Details of a request."""
# attributes from urlsplit that this class also sets
uri_attrs = ('scheme', 'netloc', 'path', 'query', 'fragment')
def __init__(self, uri, headers, rfile=None):
self.uri = uri
self.headers = headers
parsed = urlparse.urlsplit(uri)
for i, attr in enumerate(self.uri_attrs):
setattr(self, attr, parsed[i])
try:
body_len = int(self.headers.get('Content-length', 0))
except ValueError:
body_len = 0
if body_len and rfile:
self.body = rfile.read(body_len)
else:
self.body = None
class RequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
class MozRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
docroot = os.getcwd() # current working directory at time of import
proxy_host_dirs = False
request = None
def _try_handler(self, method):
handlers = [handler for handler in self.urlhandlers
if handler['method'] == method]
for handler in handlers:
m = re.match(handler['path'], self.request.path)
if m:
(response_code, headerdict, data) = \
handler['function'](self.request, *m.groups())
self.send_response(response_code)
for (keyword, value) in headerdict.iteritems():
self.send_header(keyword, value)
self.end_headers()
self.wfile.write(data)
return True
return False
def parse_request(self):
retval = SimpleHTTPServer.SimpleHTTPRequestHandler.parse_request(self)
if '?' in self.path:
# ignore query string, otherwise SimpleHTTPRequestHandler
# will treat it as PATH_INFO for `translate_path`
self.path = self.path.split('?', 1)[0]
self.request = Request(self.path, self.headers, self.rfile)
return retval
def do_GET(self):
if not self._try_handler('GET'):
if self.docroot:
# don't include query string and fragment, and prepend
# host directory if required.
if self.request.netloc and self.proxy_host_dirs:
self.path = '/' + self.request.netloc + \
self.request.path
else:
self.path = self.request.path
SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
else:
self.send_response(404)
self.end_headers()
self.wfile.write('')
def do_POST(self):
# if we don't have a match, we always fall through to 404 (this may
# not be "technically" correct if we have a local file at the same
# path as the resource but... meh)
if not self._try_handler('POST'):
self.send_response(404)
self.end_headers()
self.wfile.write('')
def do_DEL(self):
# if we don't have a match, we always fall through to 404 (this may
# not be "technically" correct if we have a local file at the same
# path as the resource but... meh)
if not self._try_handler('DEL'):
self.send_response(404)
self.end_headers()
self.wfile.write('')
def translate_path(self, path):
path = path.strip('/').split()
if path == ['']:
path = []
path.insert(0, self.docroot)
return os.path.join(*path)
# this is taken from SimpleHTTPRequestHandler.translate_path(),
# except we serve from self.docroot instead of os.getcwd(), and
# parse_request()/do_GET() have already stripped the query string and
# fragment and mangled the path for proxying, if required.
path = posixpath.normpath(urllib.unquote(self.path))
words = path.split('/')
words = filter(None, words)
path = self.docroot
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir): continue
path = os.path.join(path, word)
return path
# I found on my local network that calls to this were timing out
# I believe all of these calls are from log_message
@ -76,22 +180,59 @@ class MozRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
pass
class MozHttpd(object):
def __init__(self, host="127.0.0.1", port=8888, docroot=os.getcwd(), handler_class=MozRequestHandler):
class MozHttpd(object):
"""
Very basic HTTP server class. Takes a docroot (path on the filesystem)
and a set of urlhandler dictionaries of the form:
{
'method': HTTP method (string): GET, POST, or DEL,
'path': PATH_INFO (regular expression string),
'function': function of form fn(arg1, arg2, arg3, ..., request)
}
and serves HTTP. For each request, MozHttpd will either return a file
off the docroot, or dispatch to a handler function (if both path and
method match).
Note that one of docroot or urlhandlers may be None (in which case no
local files or handlers, respectively, will be used). If both docroot or
urlhandlers are None then MozHttpd will default to serving just the local
directory.
MozHttpd also handles proxy requests (i.e. with a full URI on the request
line). By default files are served from docroot according to the request
URI's path component, but if proxy_host_dirs is True, files are served
from <self.docroot>/<host>/.
For example, the request "GET http://foo.bar/dir/file.html" would
(assuming no handlers match) serve <docroot>/dir/file.html if
proxy_host_dirs is False, or <docroot>/foo.bar/dir/file.html if it is
True.
"""
def __init__(self, host="127.0.0.1", port=8888, docroot=None,
urlhandlers=None, proxy_host_dirs=False):
self.host = host
self.port = int(port)
self.docroot = docroot
if not urlhandlers and not docroot:
self.docroot = os.getcwd()
self.proxy_host_dirs = proxy_host_dirs
self.httpd = None
self.urlhandlers = urlhandlers or []
class MozRequestHandlerInstance(handler_class):
class RequestHandlerInstance(RequestHandler):
docroot = self.docroot
urlhandlers = self.urlhandlers
proxy_host_dirs = self.proxy_host_dirs
self.handler_class = MozRequestHandlerInstance
self.handler_class = RequestHandlerInstance
def start(self, block=False):
"""
start the server. If block is True, the call will not return.
Start the server. If block is True, the call will not return.
If block is False, the server will be started on a separate thread that
can be terminated by a call to .stop()
"""
@ -102,35 +243,14 @@ class MozHttpd(object):
self.server = threading.Thread(target=self.httpd.serve_forever)
self.server.setDaemon(True) # don't hang on exit
self.server.start()
def testServer(self):
fileList = os.listdir(self.docroot)
filehandle = urllib.urlopen('http://%s:%s/?foo=bar&fleem=&foo=fleem' % (self.host, self.port))
data = filehandle.readlines()
filehandle.close()
retval = True
for line in data:
found = False
# '@' denotes a symlink and we need to ignore it.
webline = re.sub('\<[a-zA-Z0-9\-\_\.\=\"\'\/\\\%\!\@\#\$\^\&\*\(\) ]*\>', '', line.strip('\n')).strip('/').strip().strip('@')
if webline != "":
if webline == "Directory listing for":
found = True
else:
for fileName in fileList:
if fileName == webline:
found = True
if not found:
retval = False
print >> sys.stderr, "NOT FOUND: " + webline.strip()
return retval
def stop(self):
if self.httpd:
self.httpd.shutdown()
### FIXME: There is no shutdown() method in Python 2.4...
try:
self.httpd.shutdown()
except AttributeError:
pass
self.httpd = None
__del__ = stop
@ -150,9 +270,6 @@ def main(args=sys.argv[1:]):
parser.add_option('-d', '--docroot', dest='docroot',
default=os.getcwd(),
help="directory to serve files from [DEFAULT: %default]")
parser.add_option('--test', dest='test',
action='store_true', default=False,
help='run the tests and exit')
options, args = parser.parse_args(args)
if args:
parser.print_help()
@ -160,14 +277,10 @@ def main(args=sys.argv[1:]):
# create the server
kwargs = options.__dict__.copy()
test = kwargs.pop('test')
server = MozHttpd(**kwargs)
if test:
server.start()
server.testServer()
else:
server.start(block=True)
print "Serving '%s' at %s:%s" % (server.docroot, server.host, server.port)
server.start(block=True)
if __name__ == '__main__':
main()

Просмотреть файл

@ -44,7 +44,7 @@ try:
except IOError:
description = None
version = '0.1'
version = '0.2'
deps = []

Просмотреть файл

@ -0,0 +1,295 @@
#!/usr/bin/env python
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is mozilla.org code.
#
# The Initial Developer of the Original Code is
# the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# William Lachance <wlachance@mozilla.com>
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
import mozhttpd
import urllib2
import os
import unittest
import re
try:
import json
except ImportError:
import simplejson as json
import tempfile
here = os.path.dirname(os.path.abspath(__file__))
class ApiTest(unittest.TestCase):
resource_get_called = 0
resource_post_called = 0
resource_del_called = 0
@mozhttpd.handlers.json_response
def resource_get(self, request, objid):
self.resource_get_called += 1
return (200, { 'called': self.resource_get_called,
'id': objid,
'query': request.query })
@mozhttpd.handlers.json_response
def resource_post(self, request):
self.resource_post_called += 1
return (201, { 'called': self.resource_post_called,
'data': json.loads(request.body),
'query': request.query })
@mozhttpd.handlers.json_response
def resource_del(self, request, objid):
self.resource_del_called += 1
return (200, { 'called': self.resource_del_called,
'id': objid,
'query': request.query })
def get_url(self, path, server_port, querystr):
url = "http://127.0.0.1:%s%s" % (server_port, path)
if querystr:
url += "?%s" % querystr
return url
def try_get(self, server_port, querystr):
self.resource_get_called = 0
f = urllib2.urlopen(self.get_url('/api/resource/1', server_port, querystr))
try:
self.assertEqual(f.getcode(), 200)
except AttributeError:
pass # python 2.4
self.assertEqual(json.loads(f.read()), { 'called': 1, 'id': str(1), 'query': querystr })
self.assertEqual(self.resource_get_called, 1)
def try_post(self, server_port, querystr):
self.resource_post_called = 0
postdata = { 'hamburgers': '1234' }
try:
f = urllib2.urlopen(self.get_url('/api/resource/', server_port, querystr),
data=json.dumps(postdata))
except urllib2.HTTPError, e:
# python 2.4
self.assertEqual(e.code, 201)
body = e.fp.read()
else:
self.assertEqual(f.getcode(), 201)
body = f.read()
self.assertEqual(json.loads(body), { 'called': 1,
'data': postdata,
'query': querystr })
self.assertEqual(self.resource_post_called, 1)
def try_del(self, server_port, querystr):
self.resource_del_called = 0
opener = urllib2.build_opener(urllib2.HTTPHandler)
request = urllib2.Request(self.get_url('/api/resource/1', server_port, querystr))
request.get_method = lambda: 'DEL'
f = opener.open(request)
try:
self.assertEqual(f.getcode(), 200)
except AttributeError:
pass # python 2.4
self.assertEqual(json.loads(f.read()), { 'called': 1, 'id': str(1), 'query': querystr })
self.assertEqual(self.resource_del_called, 1)
def test_api(self):
httpd = mozhttpd.MozHttpd(port=0,
urlhandlers = [ { 'method': 'GET',
'path': '/api/resource/([^/]+)/?',
'function': self.resource_get },
{ 'method': 'POST',
'path': '/api/resource/?',
'function': self.resource_post },
{ 'method': 'DEL',
'path': '/api/resource/([^/]+)/?',
'function': self.resource_del }
])
httpd.start(block=False)
server_port = httpd.httpd.server_port
# GET
self.try_get(server_port, '')
self.try_get(server_port, '?foo=bar')
# POST
self.try_post(server_port, '')
self.try_post(server_port, '?foo=bar')
# DEL
self.try_del(server_port, '')
self.try_del(server_port, '?foo=bar')
# GET: By default we don't serve any files if we just define an API
f = None
exception_thrown = False
try:
f = urllib2.urlopen(self.get_url('/', server_port, None))
except urllib2.HTTPError, e:
self.assertEqual(e.code, 404)
exception_thrown = True
self.assertTrue(exception_thrown)
def test_nonexistent_resources(self):
# Create a server with a placeholder handler so we don't fall back
# to serving local files
httpd = mozhttpd.MozHttpd(port=0)
httpd.start(block=False)
server_port = httpd.httpd.server_port
# GET: Return 404 for non-existent endpoint
f = None
exception_thrown = False
try:
f = urllib2.urlopen(self.get_url('/api/resource/', server_port, None))
except urllib2.HTTPError, e:
self.assertEqual(e.code, 404)
exception_thrown = True
self.assertTrue(exception_thrown)
# POST: POST should also return 404
f = None
exception_thrown = False
try:
f = urllib2.urlopen(self.get_url('/api/resource/', server_port, None),
data=json.dumps({}))
except urllib2.HTTPError, e:
self.assertEqual(e.code, 404)
exception_thrown = True
self.assertTrue(exception_thrown)
# DEL: DEL should also return 404
f = None
exception_thrown = False
try:
opener = urllib2.build_opener(urllib2.HTTPHandler)
request = urllib2.Request(self.get_url('/api/resource/', server_port,
None))
request.get_method = lambda: 'DEL'
f = opener.open(request)
except urllib2.HTTPError, e:
self.assertEqual(e.code, 404)
exception_thrown = True
self.assertTrue(exception_thrown)
def test_api_with_docroot(self):
httpd = mozhttpd.MozHttpd(port=0, docroot=here,
urlhandlers = [ { 'method': 'GET',
'path': '/api/resource/([^/]+)/?',
'function': self.resource_get } ])
httpd.start(block=False)
server_port = httpd.httpd.server_port
# We defined a docroot, so we expect a directory listing
f = urllib2.urlopen(self.get_url('/', server_port, None))
try:
self.assertEqual(f.getcode(), 200)
except AttributeError:
pass # python 2.4
self.assertTrue('Directory listing for' in f.read())
# Make sure API methods still work
self.try_get(server_port, '')
self.try_get(server_port, '?foo=bar')
def test_proxy(self):
docroot = tempfile.mkdtemp()
hosts = ('mozilla.com', 'mozilla.org')
unproxied_host = 'notmozilla.org'
def url(host): return 'http://%s/' % host
index_filename = 'index.html'
def index_contents(host): return '%s index' % host
index = file(os.path.join(docroot, index_filename), 'w')
index.write(index_contents('*'))
index.close()
httpd = mozhttpd.MozHttpd(port=0, docroot=docroot)
httpd.start(block=False)
server_port = httpd.httpd.server_port
proxy_support = urllib2.ProxyHandler({'http': 'http://127.0.0.1:%d' %
server_port})
urllib2.install_opener(urllib2.build_opener(proxy_support))
for host in hosts:
f = urllib2.urlopen(url(host))
try:
self.assertEqual(f.getcode(), 200)
except AttributeError:
pass # python 2.4
self.assertEqual(f.read(), index_contents('*'))
httpd.stop()
# test separate directories per host
httpd = mozhttpd.MozHttpd(port=0, docroot=docroot, proxy_host_dirs=True)
httpd.start(block=False)
server_port = httpd.httpd.server_port
proxy_support = urllib2.ProxyHandler({'http': 'http://127.0.0.1:%d' %
server_port})
urllib2.install_opener(urllib2.build_opener(proxy_support))
# set up dirs
for host in hosts:
os.mkdir(os.path.join(docroot, host))
file(os.path.join(docroot, host, index_filename), 'w') \
.write(index_contents(host))
for host in hosts:
f = urllib2.urlopen(url(host))
try:
self.assertEqual(f.getcode(), 200)
except AttributeError:
pass # python 2.4
self.assertEqual(f.read(), index_contents(host))
exc = None
try:
urllib2.urlopen(url(unproxied_host))
except urllib2.HTTPError, e:
exc = e
self.assertNotEqual(exc, None)
self.assertEqual(exc.code, 404)
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -0,0 +1,75 @@
#!/usr/bin/env python
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is mozilla.org code.
#
# The Initial Developer of the Original Code is
# the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2011
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Joel Maher <joel.maher@gmail.com>
# William Lachance <wlachance@mozilla.com>
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
import mozhttpd
import urllib2
import os
import unittest
import re
here = os.path.dirname(os.path.abspath(__file__))
class FileListingTest(unittest.TestCase):
def check_filelisting(self, path=''):
filelist = os.listdir(here)
httpd = mozhttpd.MozHttpd(port=0, docroot=here)
httpd.start(block=False)
f = urllib2.urlopen("http://%s:%s/%s" % ('127.0.0.1', httpd.httpd.server_port, path))
for line in f.readlines():
webline = re.sub('\<[a-zA-Z0-9\-\_\.\=\"\'\/\\\%\!\@\#\$\^\&\*\(\) ]*\>', '', line.strip('\n')).strip('/').strip().strip('@')
if webline and not webline.startswith("Directory listing for"):
self.assertTrue(webline in filelist,
"File %s in dir listing corresponds to a file" % webline)
filelist.remove(webline)
self.assertFalse(filelist, "Should have no items in filelist (%s) unaccounted for" % filelist)
def test_filelist(self):
self.check_filelisting()
def test_filelist_params(self):
self.check_filelisting('?foo=bar&fleem=&foo=fleem')
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -0,0 +1,2 @@
[filelisting.py]
[api.py]

Просмотреть файл

@ -128,13 +128,15 @@ class ProcessHandlerMixin(object):
winprocess.TerminateJobObject(self._job, winprocess.ERROR_CONTROL_C_EXIT)
self.returncode = winprocess.GetExitCodeProcess(self._handle)
elif self._handle:
err = None
try:
winprocess.TerminateProcess(self._handle, winprocess.ERROR_CONTROL_C_EXIT)
except:
raise OSError("Could not terminate process")
finally:
self.returncode = winprocess.GetExitCodeProcess(self._handle)
self._cleanup()
err = "Could not terminate process"
self.returncode = winprocess.GetExitCodeProcess(self._handle)
self._cleanup()
if err is not None:
raise OSError(err)
else:
pass
else:
@ -145,15 +147,10 @@ class ProcessHandlerMixin(object):
if getattr(e, "errno", None) != 3:
# Error 3 is "no such process", which is ok
print >> sys.stderr, "Could not kill process, could not find pid: %s" % self.pid
finally:
# Try to get the exit status
if self.returncode is None:
self.returncode = subprocess.Popen._internal_poll(self)
else:
os.kill(self.pid, signal.SIGKILL)
if self.returncode is None:
self.returncode = subprocess.Popen._internal_poll(self)
if self.returncode is None:
self.returncode = subprocess.Popen._internal_poll(self)
self._cleanup()
return self.returncode
@ -392,6 +389,7 @@ falling back to not using job objects for managing child processes"""
# We use queues to synchronize between the thread and this
# function because events just didn't have robust enough error
# handling on pre-2.7 versions
err = None
try:
# timeout is the max amount of time the procmgr thread will wait for
# child processes to shutdown before killing them with extreme prejudice.
@ -400,11 +398,14 @@ falling back to not using job objects for managing child processes"""
if item[self.pid] == 'FINISHED':
self._process_events.task_done()
except:
raise OSError("IO Completion Port failed to signal process shutdown")
finally:
# Either way, let's try to get this code
self.returncode = winprocess.GetExitCodeProcess(self._handle)
self._cleanup()
err = "IO Completion Port failed to signal process shutdown"
# Either way, let's try to get this code
self.returncode = winprocess.GetExitCodeProcess(self._handle)
self._cleanup()
if err is not None:
raise OSError(err)
else:
# Not managing with job objects, so all we can reasonably do

Просмотреть файл

@ -40,25 +40,70 @@
add permissions to the profile
"""
__all__ = ['LocationsSyntaxError', 'Location', 'PermissionsManager']
__all__ = ['MissingPrimaryLocationError', 'MultiplePrimaryLocationsError',
'DuplicateLocationError', 'BadPortLocationError',
'LocationsSyntaxError', 'Location', 'ServerLocations',
'Permissions']
import codecs
import itertools
import os
import sqlite3
try:
import sqlite3
except ImportError:
from pysqlite2 import dbapi2 as sqlite3
import urlparse
class LocationError(Exception):
"Signifies an improperly formed location."
def __str__(self):
s = "Bad location"
if self.message:
s += ": %s" % self.message
return s
class MissingPrimaryLocationError(LocationError):
"No primary location defined in locations file."
def __init__(self):
LocationError.__init__(self, "missing primary location")
class MultiplePrimaryLocationsError(LocationError):
"More than one primary location defined."
def __init__(self):
LocationError.__init__(self, "multiple primary locations")
class DuplicateLocationError(LocationError):
"Same location defined twice."
def __init__(self, url):
LocationError.__init__(self, "duplicate location: %s" % url)
class BadPortLocationError(LocationError):
"Location has invalid port value."
def __init__(self, given_port):
LocationError.__init__(self, "bad value for port: %s" % given_port)
class LocationsSyntaxError(Exception):
"Signifies a syntax error on a particular line in server-locations.txt."
def __init__(self, lineno, msg = None):
def __init__(self, lineno, err=None):
self.err = err
self.lineno = lineno
self.msg = msg
def __str__(self):
s = "Syntax error on line %s" % self.lineno
if self.msg:
s += ": %s." % self.msg
if self.err:
s += ": %s." % self.err
else:
s += "."
return s
@ -73,6 +118,10 @@ class Location(object):
for attr in self.attrs:
setattr(self, attr, locals()[attr])
self.options = options
try:
int(self.port)
except ValueError:
raise BadPortLocationError(self.port)
def isEqual(self, location):
"compare scheme://host:port, but ignore options"
@ -87,84 +136,63 @@ class Location(object):
return '%s %s' % (self.url(), ','.join(self.options))
class PermissionsManager(object):
_num_permissions = 0
class ServerLocations(object):
"""Iterable collection of locations.
Use provided functions to add new locations, rather that manipulating
_locations directly, in order to check for errors and to ensure the
callback is called, if given.
"""
def __init__(self, profileDir, locations=None):
self._profileDir = profileDir
self._locations = [] # for cleanup
if locations:
if isinstance(locations, list):
for l in locations:
self.add_host(**l)
elif isinstance(locations, dict):
self.add_host(**locations)
elif os.path.exists(locations):
self.add_file(locations)
def __init__(self, filename=None, add_callback=None):
self.add_callback = add_callback
self._locations = []
self.hasPrimary = False
if filename:
self.read(filename)
def write_permission(self, location):
"""write permissions to the sqlite database"""
def __iter__(self):
return self._locations.__iter__()
# Open database and create table
permDB = sqlite3.connect(os.path.join(self._profileDir, "permissions.sqlite"))
cursor = permDB.cursor();
# SQL copied from
# http://mxr.mozilla.org/mozilla-central/source/extensions/cookie/nsPermissionManager.cpp
cursor.execute("""CREATE TABLE IF NOT EXISTS moz_hosts (
id INTEGER PRIMARY KEY,
host TEXT,
type TEXT,
permission INTEGER,
expireType INTEGER,
expireTime INTEGER)""")
def __len__(self):
return len(self._locations)
# set the permissions
permissions = {'allowXULXBL':[(location.host, 'noxul' not in location.options)]}
for perm in permissions.keys():
for host,allow in permissions[perm]:
self._num_permissions += 1
cursor.execute("INSERT INTO moz_hosts values(?, ?, ?, ?, 0, 0)",
(self._num_permissions, host, perm, 1 if allow else 2))
# Commit and close
permDB.commit()
cursor.close()
def add(self, *newLocations):
"""add locations to the database"""
for location in newLocations:
for loc in self._locations:
if loc.isEqual(location):
print >> sys.stderr, "Duplicate location: %s" % location.url()
break
else:
self._locations.append(location)
self.write_permission(location)
def add(self, location, suppress_callback=False):
if "primary" in location.options:
if self.hasPrimary:
raise MultiplePrimaryLocationsError()
self.hasPrimary = True
for loc in self._locations:
if loc.isEqual(location):
raise DuplicateLocationError(location.url())
self._locations.append(location)
if self.add_callback and not suppress_callback:
self.add_callback([location])
def add_host(self, host, port='80', scheme='http', options='privileged'):
if isinstance(options, basestring):
options = options.split(',')
self.add(Location(scheme, host, port, options))
def add_file(self, path):
"""add permissions from a locations file """
self.add(self.read_locations(path))
def read_locations(self, filename):
def read(self, filename, check_for_primary=True):
"""
Reads the file (in the format of server-locations.txt) and add all
valid locations to the self.locations array.
valid locations to the self._locations array.
If check_for_primary is True, a MissingPrimaryLocationError
exception is raised if no primary is found.
This format:
http://mxr.mozilla.org/mozilla-central/source/build/pgo/server-locations.txt
The only exception is that the port, if not defined, defaults to 80.
FIXME: Shouldn't this default to the protocol-appropriate port? Is
there any reason to have defaults at all?
"""
locationFile = codecs.open(filename, "r", "UTF-8")
locations = []
lineno = 0
seenPrimary = False
new_locations = []
for line in locationFile:
line = line.strip()
lineno += 1
@ -191,27 +219,77 @@ class PermissionsManager(object):
except ValueError:
host = netloc
port = '80'
try:
int(port)
except ValueError:
raise LocationsSyntaxError(lineno, 'bad value for port: %s' % line)
location = Location(scheme, host, port, options)
self.add(location, suppress_callback=True)
except LocationError, e:
raise LocationsSyntaxError(lineno, e)
# check for primary location
if "primary" in options:
if seenPrimary:
raise LocationsSyntaxError(lineno, "multiple primary locations")
seenPrimary = True
# add the location
locations.append(Location(scheme, host, port, options))
new_locations.append(location)
# ensure that a primary is found
if not seenPrimary:
raise LocationsSyntaxError(lineno + 1, "missing primary location")
if check_for_primary and not self.hasPrimary:
raise LocationsSyntaxError(lineno + 1,
MissingPrimaryLocationError())
return locations
if self.add_callback:
self.add_callback(new_locations)
def getNetworkPreferences(self, proxy=False):
class Permissions(object):
_num_permissions = 0
def __init__(self, profileDir, locations=None):
self._profileDir = profileDir
self._locations = ServerLocations(add_callback=self.write_db)
if locations:
if isinstance(locations, ServerLocations):
self._locations = locations
self._locations.add_callback = self.write_db
self.write_db(self._locations._locations)
elif isinstance(locations, list):
for l in locations:
self._locations.add_host(**l)
elif isinstance(locations, dict):
self._locations.add_host(**locations)
elif os.path.exists(locations):
self._locations.read(locations)
def write_db(self, locations):
"""write permissions to the sqlite database"""
# Open database and create table
permDB = sqlite3.connect(os.path.join(self._profileDir, "permissions.sqlite"))
cursor = permDB.cursor();
# SQL copied from
# http://mxr.mozilla.org/mozilla-central/source/extensions/cookie/nsPermissionManager.cpp
cursor.execute("""CREATE TABLE IF NOT EXISTS moz_hosts (
id INTEGER PRIMARY KEY,
host TEXT,
type TEXT,
permission INTEGER,
expireType INTEGER,
expireTime INTEGER)""")
for location in locations:
# set the permissions
permissions = { 'allowXULXBL': 'noxul' not in location.options }
for perm, allow in permissions.iteritems():
self._num_permissions += 1
if allow:
permission_type = 1
else:
permission_type = 2
cursor.execute("INSERT INTO moz_hosts values(?, ?, ?, ?, 0, 0)",
(self._num_permissions, location.host, perm,
permission_type))
# Commit and close
permDB.commit()
cursor.close()
def network_prefs(self, proxy=False):
"""
take known locations and generate preferences to handle permissions and proxy
returns a tuple of prefs, user_prefs
@ -219,7 +297,7 @@ class PermissionsManager(object):
# Grant God-power to all the privileged servers on which tests run.
prefs = []
privileged = filter(lambda loc: "privileged" in loc.options, self._locations)
privileged = [i for i in self._locations if "privileged" in i.options]
for (i, l) in itertools.izip(itertools.count(1), privileged):
prefs.append(("capability.principal.codebase.p%s.granted" % i, "UniversalXPConnect"))
@ -228,13 +306,13 @@ class PermissionsManager(object):
prefs.append(("capability.principal.codebase.p%s.subjectName" % i, ""))
if proxy:
user_prefs = self.pacPrefs()
user_prefs = self.pac_prefs()
else:
user_prefs = []
return prefs, user_prefs
def pacPrefs(self):
def pac_prefs(self):
"""
return preferences for Proxy Auto Config. originally taken from
http://mxr.mozilla.org/mozilla-central/source/build/automation.py.in
@ -301,7 +379,7 @@ function FindProxyForURL(url, host)
return prefs
def clean_permissions(self):
def clean_db(self):
"""Removed permissions added by mozprofile."""
# Open database and create table

Просмотреть файл

@ -44,7 +44,7 @@ __all__ = ['Profile', 'FirefoxProfile', 'ThunderbirdProfile']
import os
import tempfile
from addons import AddonManager
from permissions import PermissionsManager
from permissions import Permissions
from shutil import rmtree
try:
@ -69,6 +69,12 @@ class Profile(object):
# if true, remove installed addons/prefs afterwards
self.restore = restore
# prefs files written to
self.written_prefs = set()
# our magic markers
self.delimeters = ('#MozRunner Prefs Start', '#MozRunner Prefs End')
# Handle profile creation
self.create_new = not profile
if profile:
@ -99,8 +105,8 @@ class Profile(object):
# set permissions
self._locations = locations # store this for reconstruction
self._proxy = proxy
self.permission_manager = PermissionsManager(self.profile, locations)
prefs_js, user_js = self.permission_manager.getNetworkPreferences(proxy)
self.permissions = Permissions(self.profile, locations)
prefs_js, user_js = self.permissions.network_prefs(proxy)
self.set_preferences(prefs_js, 'prefs.js')
self.set_preferences(user_js)
@ -139,34 +145,37 @@ class Profile(object):
def set_preferences(self, preferences, filename='user.js'):
"""Adds preferences dict to profile preferences"""
# append to the file
prefs_file = os.path.join(self.profile, filename)
f = open(prefs_file, 'a')
if isinstance(preferences, dict):
# order doesn't matter
preferences = preferences.items()
# write the preferences
if preferences:
f.write('\n#MozRunner Prefs Start\n')
# note what files we've touched
self.written_prefs.add(filename)
if isinstance(preferences, dict):
# order doesn't matter
preferences = preferences.items()
# write the preferences
f.write('\n%s\n' % self.delimeters[0])
_prefs = [(simplejson.dumps(k), simplejson.dumps(v) )
for k, v in preferences]
for _pref in _prefs:
f.write('user_pref(%s, %s);\n' % _pref)
f.write('#MozRunner Prefs End\n')
f.write('%s\n' % self.delimeters[1])
f.close()
def pop_preferences(self):
def pop_preferences(self, filename):
"""
pop the last set of preferences added
returns True if popped
"""
# our magic markers
delimeters = ('#MozRunner Prefs Start', '#MozRunner Prefs End')
lines = file(os.path.join(self.profile, 'user.js')).read().splitlines()
lines = file(os.path.join(self.profile, filename)).read().splitlines()
def last_index(_list, value):
"""
returns the last index of an item;
@ -175,29 +184,32 @@ class Profile(object):
for index in reversed(range(len(_list))):
if _list[index] == value:
return index
s = last_index(lines, delimeters[0])
e = last_index(lines, delimeters[1])
s = last_index(lines, self.delimeters[0])
e = last_index(lines, self.delimeters[1])
# ensure both markers are found
if s is None:
assert e is None, '%s found without %s' % (delimeters[1], delimeters[0])
assert e is None, '%s found without %s' % (self.delimeters[1], self.delimeters[0])
return False # no preferences found
elif e is None:
assert e is None, '%s found without %s' % (delimeters[0], delimeters[1])
assert s is None, '%s found without %s' % (self.delimeters[0], self.delimeters[1])
# ensure the markers are in the proper order
assert e > s, '%s found at %s, while %s found at %s' (delimeter[1], e, delimeter[0], s)
assert e > s, '%s found at %s, while %s found at %s' % (self.delimeters[1], e, self.delimeters[0], s)
# write the prefs
cleaned_prefs = '\n'.join(lines[:s] + lines[e+1:])
f = file(os.path.join(self.profile, 'user.js'), 'w')
f.write(cleaned_prefs)
f.close()
return True
def clean_preferences(self):
"""Removed preferences added by mozrunner."""
while True:
if not self.pop_preferences():
break
for filename in self.written_prefs:
while True:
if not self.pop_preferences(filename):
break
### cleanup
@ -237,7 +249,7 @@ class Profile(object):
else:
self.clean_preferences()
self.addon_manager.clean_addons()
self.permission_manager.clean_permissions()
self.permissions.clean_db()
__del__ = cleanup
@ -268,6 +280,8 @@ class FirefoxProfile(Profile):
'extensions.update.notifyUser' : False,
# Suppress automatic safe mode after crashes
'toolkit.startup.max_resumed_crashes' : -1,
# Enable test mode to run multiple tests in parallel
'focusmanager.testmode' : True,
}
class ThunderbirdProfile(Profile):

Просмотреть файл

@ -53,6 +53,11 @@ try:
import json
except ImportError:
deps.append('simplejson')
try:
import sqlite3
except ImportError:
deps.append('pysqlite')
# take description from README
here = os.path.dirname(os.path.abspath(__file__))

Просмотреть файл

@ -1,3 +1,4 @@
[addonid.py]
[server_locations.py]
[testprofile.py]
[test_preferences.py]
[permissions.py]

Просмотреть файл

@ -0,0 +1,113 @@
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import shutil
try:
import sqlite3
except ImportError:
from pysqlite2 import dbapi2 as sqlite3
import tempfile
import unittest
from mozprofile.permissions import Permissions
class PermissionsTest(unittest.TestCase):
locations = """http://mochi.test:8888 primary,privileged
http://127.0.0.1:80 noxul
http://127.0.0.1:8888 privileged
"""
profile_dir = None
locations_file = None
def setUp(self):
self.profile_dir = tempfile.mkdtemp()
self.locations_file = tempfile.NamedTemporaryFile()
self.locations_file.write(self.locations)
self.locations_file.flush()
def tearDown(self):
if self.profile_dir:
shutil.rmtree(self.profile_dir)
if self.locations_file:
self.locations_file.close()
def test_permissions_db(self):
perms = Permissions(self.profile_dir, self.locations_file.name)
perms_db_filename = os.path.join(self.profile_dir, 'permissions.sqlite')
select_stmt = 'select host, type, permission from moz_hosts'
con = sqlite3.connect(perms_db_filename)
cur = con.cursor()
cur.execute(select_stmt)
entries = cur.fetchall()
self.assertEqual(len(entries), 3)
self.assertEqual(entries[0][0], 'mochi.test')
self.assertEqual(entries[0][1], 'allowXULXBL')
self.assertEqual(entries[0][2], 1)
self.assertEqual(entries[1][0], '127.0.0.1')
self.assertEqual(entries[1][1], 'allowXULXBL')
self.assertEqual(entries[1][2], 2)
self.assertEqual(entries[2][0], '127.0.0.1')
self.assertEqual(entries[2][1], 'allowXULXBL')
self.assertEqual(entries[2][2], 1)
perms._locations.add_host('a.b.c', options='noxul')
cur.execute(select_stmt)
entries = cur.fetchall()
self.assertEqual(len(entries), 4)
self.assertEqual(entries[3][0], 'a.b.c')
self.assertEqual(entries[3][1], 'allowXULXBL')
self.assertEqual(entries[3][2], 2)
perms.clean_db()
# table should be removed
cur.execute("select * from sqlite_master where type='table'")
entries = cur.fetchall()
self.assertEqual(len(entries), 0)
def test_nw_prefs(self):
perms = Permissions(self.profile_dir, self.locations_file.name)
prefs, user_prefs = perms.network_prefs(False)
self.assertEqual(len(user_prefs), 0)
self.assertEqual(len(prefs), 6)
self.assertEqual(prefs[0], ('capability.principal.codebase.p1.granted',
'UniversalXPConnect'))
self.assertEqual(prefs[1], ('capability.principal.codebase.p1.id',
'http://mochi.test'))
self.assertEqual(prefs[2], ('capability.principal.codebase.p1.subjectName', ''))
self.assertEqual(prefs[3], ('capability.principal.codebase.p2.granted',
'UniversalXPConnect'))
self.assertEqual(prefs[4], ('capability.principal.codebase.p2.id',
'http://127.0.0.1'))
self.assertEqual(prefs[5], ('capability.principal.codebase.p2.subjectName', ''))
prefs, user_prefs = perms.network_prefs(True)
self.assertEqual(len(user_prefs), 2)
self.assertEqual(user_prefs[0], ('network.proxy.type', 2))
self.assertEqual(user_prefs[1][0], 'network.proxy.autoconfig_url')
origins_decl = "var origins = ['http://127.0.0.1:80', 'http://127.0.0.1:8888'];"
self.assertTrue(origins_decl in user_prefs[1][1])
proxy_check = "if (isHttp) return 'PROXY mochi.test:8888'; if (isHttps || isWebSocket || isWebSocketSSL) return 'PROXY mochi.test:443';"
self.assertTrue(proxy_check in user_prefs[1][1])
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -1,64 +1,148 @@
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import shutil
import tempfile
import unittest
from mozprofile.permissions import PermissionsManager
from mozprofile.permissions import ServerLocations, \
MissingPrimaryLocationError, MultiplePrimaryLocationsError, \
DuplicateLocationError, BadPortLocationError, LocationsSyntaxError
class ServerLocationsTest(unittest.TestCase):
"""test server locations"""
locations = """
# This is the primary location from which tests run.
locations = """# This is the primary location from which tests run.
#
http://mochi.test:8888 primary,privileged
http://mochi.test:8888 primary,privileged
# a few test locations
http://127.0.0.1:80 privileged
http://127.0.0.1:8888 privileged
https://test:80 privileged
http://mochi.test:8888 privileged
http://example.org:80 privileged
http://test1.example.org:80 privileged
http://127.0.0.1:80 privileged
http://127.0.0.1:8888 privileged
https://test:80 privileged
http://example.org:80 privileged
http://test1.example.org privileged
"""
locations_no_primary = """http://secondary.test:80 privileged
http://tertiary.test:8888 privileged
"""
locations_bad_port = """http://mochi.test:8888 primary,privileged
http://127.0.0.1:80 privileged
http://127.0.0.1:8888 privileged
http://test:badport privileged
http://example.org:80 privileged
"""
def compare_location(self, location, scheme, host, port, options):
self.assertEqual(location.scheme, scheme)
self.assertEqual(location.host, host)
self.assertEqual(location.port, port)
self.assertEqual(location.options, options)
def create_temp_file(self, contents):
f = tempfile.NamedTemporaryFile()
f.write(contents)
f.flush()
return f
def test_server_locations(self):
# make a permissions manager
# needs a pointless temporary directory for now
tempdir = tempfile.mkdtemp()
permissions = PermissionsManager(tempdir)
# write a permissions file
fd, filename = tempfile.mkstemp()
os.write(fd, self.locations)
os.close(fd)
f = self.create_temp_file(self.locations)
# read the locations
locations = permissions.read_locations(filename)
locations = ServerLocations(f.name)
# ensure that they're what we expect
self.assertEqual(len(locations), 7)
self.compare_location(locations[0], 'http', 'mochi.test', '8888', ['primary', 'privileged'])
self.compare_location(locations[1], 'http', '127.0.0.1', '80', ['privileged'])
self.compare_location(locations[2], 'http', '127.0.0.1', '8888', ['privileged'])
self.compare_location(locations[3], 'https', 'test', '80', ['privileged'])
self.compare_location(locations[4], 'http', 'mochi.test', '8888', ['privileged'])
self.compare_location(locations[5], 'http', 'example.org', '80', ['privileged'])
self.compare_location(locations[6], 'http', 'test1.example.org', '80', ['privileged'])
self.assertEqual(len(locations), 6)
i = iter(locations)
self.compare_location(i.next(), 'http', 'mochi.test', '8888',
['primary', 'privileged'])
self.compare_location(i.next(), 'http', '127.0.0.1', '80',
['privileged'])
self.compare_location(i.next(), 'http', '127.0.0.1', '8888',
['privileged'])
self.compare_location(i.next(), 'https', 'test', '80', ['privileged'])
self.compare_location(i.next(), 'http', 'example.org', '80',
['privileged'])
self.compare_location(i.next(), 'http', 'test1.example.org', '80',
['privileged'])
# cleanup
del permissions
shutil.rmtree(tempdir)
os.remove(filename)
locations.add_host('mozilla.org')
self.assertEqual(len(locations), 7)
self.compare_location(i.next(), 'http', 'mozilla.org', '80',
['privileged'])
# test some errors
self.assertRaises(MultiplePrimaryLocationsError, locations.add_host,
'primary.test', options='primary')
self.assertRaises(DuplicateLocationError, locations.add_host,
'127.0.0.1')
self.assertRaises(BadPortLocationError, locations.add_host, '127.0.0.1',
port='abc')
# test some errors in locations file
f = self.create_temp_file(self.locations_no_primary)
exc = None
try:
ServerLocations(f.name)
except LocationsSyntaxError, e:
exc = e
self.assertNotEqual(exc, None)
self.assertEqual(exc.err.__class__, MissingPrimaryLocationError)
self.assertEqual(exc.lineno, 3)
# test bad port in a locations file to ensure lineno calculated
# properly.
f = self.create_temp_file(self.locations_bad_port)
exc = None
try:
ServerLocations(f.name)
except LocationsSyntaxError, e:
exc = e
self.assertNotEqual(exc, None)
self.assertEqual(exc.err.__class__, BadPortLocationError)
self.assertEqual(exc.lineno, 4)
def test_server_locations_callback(self):
class CallbackTest(object):
last_locations = None
def callback(self, locations):
self.last_locations = locations
c = CallbackTest()
f = self.create_temp_file(self.locations)
locations = ServerLocations(f.name, c.callback)
# callback should be for all locations in file
self.assertEqual(len(c.last_locations), 6)
# validate arbitrary one
self.compare_location(c.last_locations[2], 'http', '127.0.0.1', '8888',
['privileged'])
locations.add_host('a.b.c')
# callback should be just for one location
self.assertEqual(len(c.last_locations), 1)
self.compare_location(c.last_locations[0], 'http', 'a.b.c', '80',
['privileged'])
# read a second file, which should generate a callback with both
# locations.
f = self.create_temp_file(self.locations_no_primary)
locations.read(f.name)
self.assertEqual(len(c.last_locations), 2)
if __name__ == '__main__':

Просмотреть файл

@ -8,7 +8,7 @@ import unittest
from mozprofile.prefs import Preferences
from mozprofile.profile import Profile
class ProfileTest(unittest.TestCase):
class PreferencesTest(unittest.TestCase):
"""test mozprofile"""
def run_command(self, *args):
@ -122,7 +122,48 @@ browser.startup.homepage = http://github.com/
self.assertFalse(final_prefs)
lines = file(prefs_file).read().strip().splitlines()
self.assertTrue('#MozRunner Prefs Start' not in lines)
self.assertTrue('#MozRunner Prefs End' not in lines)
self.assertTrue('#MozRunner Prefs End' not in lines)
def test_preexisting_preferences(self):
"""ensure you don't clobber preexisting preferences"""
# make a pretend profile
tempdir = tempfile.mkdtemp()
try:
# make a user.js
contents = """
user_pref("webgl.enabled_for_all_sites", true);
user_pref("webgl.force-enabled", true);
"""
user_js = os.path.join(tempdir, 'user.js')
f = file(user_js, 'w')
f.write(contents)
f.close()
# make sure you can read it
prefs = Preferences.read_prefs(user_js)
original_prefs = [('webgl.enabled_for_all_sites', True), ('webgl.force-enabled', True)]
self.assertTrue(prefs == original_prefs)
# now read this as a profile
profile = Profile(tempdir, preferences={"browser.download.dir": "/home/jhammel"})
# make sure the new pref is now there
new_prefs = original_prefs[:] + [("browser.download.dir", "/home/jhammel")]
prefs = Preferences.read_prefs(user_js)
self.assertTrue(prefs == new_prefs)
# clean up the added preferences
profile.cleanup()
del profile
# make sure you have the original preferences
prefs = Preferences.read_prefs(user_js)
self.assertTrue(prefs == original_prefs)
except:
shutil.rmtree(tempdir)
raise
def test_json(self):
_prefs = {"browser.startup.homepage": "http://planet.mozilla.org/"}

Просмотреть файл

@ -1,5 +1,9 @@
# mozbase test manifest, in the format of
# https://github.com/mozilla/mozbase/blob/master/manifestdestiny/README.txt
# run with
# https://github.com/mozilla/mozbase/blob/master/test.py
[include:mozprocess/tests/manifest.ini]
[include:mozprofile/tests/manifest.ini]
[include:mozhttpd/tests/manifest.ini]

Просмотреть файл

@ -1,7 +1,8 @@
#!/usr/bin/env python
"""
run mozbase tests
run mozbase tests from a manifest,
by default https://github.com/mozilla/mozbase/blob/master/test-manifest.ini
"""
import imp