# # ***** BEGIN LICENSE BLOCK ***** # Version: MPL 1.1/GPL 2.0/LGPL 2.1 # # The contents of this file are subject to the Mozilla Public License Version # 1.1 (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # http://www.mozilla.org/MPL/ # # Software distributed under the License is distributed on an "AS IS" basis, # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License # for the specific language governing rights and limitations under the # License. # # The Original Code is mozilla.org code. # # The Initial Developer of the Original Code is # Mozilla Foundation. # Portions created by the Initial Developer are Copyright (C) 2008 # the Initial Developer. All Rights Reserved. # # Contributor(s): # Robert Sayre # Jeff Walden # # Alternatively, the contents of this file may be used under the terms of # either the GNU General Public License Version 2 or later (the "GPL"), or # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), # in which case the provisions of the GPL or the LGPL are applicable instead # of those above. If you wish to allow use of your version of this file only # under the terms of either the GPL or the LGPL, and not to allow others to # use your version of this file under the terms of the MPL, indicate your # decision by deleting the provisions above and replace them with the notice # and other provisions required by the GPL or the LGPL. If you do not delete # the provisions above, a recipient may use your version of this file under # the terms of any one of the MPL, the GPL or the LGPL. # # ***** END LICENSE BLOCK ***** import codecs from datetime import datetime import itertools import logging import os import re import shutil import signal import subprocess import sys import threading """ Runs the browser from a script, and provides useful utilities for setting up the browser environment. """ SCRIPT_DIR = os.path.abspath(os.path.realpath(os.path.dirname(sys.argv[0]))) __all__ = [ "UNIXISH", "IS_WIN32", "IS_MAC", "runApp", "Process", "initializeProfile", "DIST_BIN", "DEFAULT_APP", "CERTS_SRC_DIR", "environment", ] # These are generated in mozilla/build/Makefile.in #expand DIST_BIN = __XPC_BIN_PATH__ #expand IS_WIN32 = len("__WIN32__") != 0 #expand IS_MAC = __IS_MAC__ != 0 #ifdef IS_CYGWIN #expand IS_CYGWIN = __IS_CYGWIN__ == 1 #else IS_CYGWIN = False #endif #expand IS_CAMINO = __IS_CAMINO__ != 0 #expand BIN_SUFFIX = __BIN_SUFFIX__ UNIXISH = not IS_WIN32 and not IS_MAC #expand DEFAULT_APP = "./" + __BROWSER_PATH__ #expand CERTS_SRC_DIR = __CERTS_SRC_DIR__ #expand IS_TEST_BUILD = __IS_TEST_BUILD__ #expand IS_DEBUG_BUILD = __IS_DEBUG_BUILD__ ########### # LOGGING # ########### # We use the logging system here primarily because it'll handle multiple # threads, which is needed to process the output of the server and application # processes simultaneously. log = logging.getLogger() handler = logging.StreamHandler(sys.stdout) log.setLevel(logging.INFO) log.addHandler(handler) ################# # SUBPROCESSING # ################# class Process(subprocess.Popen): """ Represents our view of a subprocess. It adds a kill() method which allows it to be stopped explicitly. """ def kill(self): if IS_WIN32: import platform pid = "%i" % self.pid if platform.release() == "2000": # Windows 2000 needs 'kill.exe' from the 'Windows 2000 Resource Kit tools'. (See bug 475455.) try: subprocess.Popen(["kill", "-f", pid]).wait() except: log.info("TEST-UNEXPECTED-FAIL | (automation.py) | Missing 'kill' utility to kill process with pid=%s. Kill it manually!", pid) else: # Windows XP and later. subprocess.Popen(["taskkill", "/F", "/PID", pid]).wait() else: os.kill(self.pid, signal.SIGKILL) ################# # PROFILE SETUP # ################# class SyntaxError(Exception): "Signifies a syntax error on a particular line in server-locations.txt." def __init__(self, lineno, msg = None): self.lineno = lineno self.msg = msg def __str__(self): s = "Syntax error on line " + str(self.lineno) if self.msg: s += ": %s." % self.msg else: s += "." return s class Location: "Represents a location line in server-locations.txt." def __init__(self, scheme, host, port, options): self.scheme = scheme self.host = host self.port = port self.options = options def readLocations(locationsPath = "server-locations.txt"): """ Reads the locations at which the Mochitest HTTP server is available from server-locations.txt. """ locationFile = codecs.open(locationsPath, "r", "UTF-8") # Perhaps more detail than necessary, but it's the easiest way to make sure # we get exactly the format we want. See server-locations.txt for the exact # format guaranteed here. lineRe = re.compile(r"^(?P[a-z][-a-z0-9+.]*)" r"://" r"(?P" r"\d+\.\d+\.\d+\.\d+" r"|" r"(?:[a-z0-9](?:[-a-z0-9]*[a-z0-9])?\.)*" r"[a-z](?:[-a-z0-9]*[a-z0-9])?" r")" r":" r"(?P\d+)" r"(?:" r"\s+" r"(?P\S+(?:,\S+)*)" r")?$") locations = [] lineno = 0 seenPrimary = False for line in locationFile: lineno += 1 if line.startswith("#") or line == "\n": continue match = lineRe.match(line) if not match: raise SyntaxError(lineno) options = match.group("options") if options: options = options.split(",") if "primary" in options: if seenPrimary: raise SyntaxError(lineno, "multiple primary locations") seenPrimary = True else: options = [] locations.append(Location(match.group("scheme"), match.group("host"), match.group("port"), options)) if not seenPrimary: raise SyntaxError(lineno + 1, "missing primary location") return locations def initializeProfile(profileDir): "Sets up the standard testing profile." # Start with a clean slate. shutil.rmtree(profileDir, True) os.mkdir(profileDir) prefs = [] part = """\ user_pref("browser.dom.window.dump.enabled", true); user_pref("dom.allow_scripts_to_close_windows", true); user_pref("dom.disable_open_during_load", false); user_pref("dom.max_script_run_time", 0); // no slow script dialogs user_pref("signed.applets.codebase_principal_support", true); user_pref("security.warn_submit_insecure", false); user_pref("browser.shell.checkDefaultBrowser", false); user_pref("shell.checkDefaultClient", false); user_pref("browser.warnOnQuit", false); user_pref("accessibility.typeaheadfind.autostart", false); user_pref("javascript.options.showInConsole", true); user_pref("layout.debug.enable_data_xbl", true); user_pref("browser.EULA.override", true); user_pref("javascript.options.jit.content", true); user_pref("gfx.color_management.force_srgb", true); user_pref("network.manage-offline-status", false); user_pref("security.default_personal_cert", "Select Automatically"); // Need to client auth test be w/o any dialogs user_pref("network.http.prompt-temp-redirect", false); user_pref("camino.warn_when_closing", false); // Camino-only, harmless to others """ prefs.append(part) # Increase the max script run time 10-fold for debug builds if (IS_DEBUG_BUILD): prefs.append("""\ user_pref("dom.max_script_run_time", 100); user_pref("dom.max_chrome_script_run_time", 200); """) locations = readLocations() # Grant God-power to all the privileged servers on which tests run. privileged = filter(lambda loc: "privileged" in loc.options, locations) for (i, l) in itertools.izip(itertools.count(1), privileged): part = """ user_pref("capability.principal.codebase.p%(i)d.granted", "UniversalXPConnect UniversalBrowserRead UniversalBrowserWrite \ UniversalPreferencesRead UniversalPreferencesWrite \ UniversalFileRead"); user_pref("capability.principal.codebase.p%(i)d.id", "%(origin)s"); user_pref("capability.principal.codebase.p%(i)d.subjectName", ""); """ % { "i": i, "origin": (l.scheme + "://" + l.host + ":" + l.port) } prefs.append(part) # We need to proxy every server but the primary one. origins = ["'%s://%s:%s'" % (l.scheme, l.host, l.port) for l in filter(lambda l: "primary" not in l.options, locations)] origins = ", ".join(origins) pacURL = """data:text/plain, function FindProxyForURL(url, host) { var origins = [%(origins)s]; var regex = new RegExp('^([a-z][-a-z0-9+.]*)' + '://' + '(?:[^/@]*@)?' + '(.*?)' + '(?::(\\\\\\\\d+))?/'); var matches = regex.exec(url); if (!matches) return 'DIRECT'; var isHttp = matches[1] == 'http'; var isHttps = matches[1] == 'https'; if (!matches[3]) { if (isHttp) matches[3] = '80'; if (isHttps) matches[3] = '443'; } var origin = matches[1] + '://' + matches[2] + ':' + matches[3]; if (origins.indexOf(origin) < 0) return 'DIRECT'; if (isHttp) return 'PROXY 127.0.0.1:8888'; if (isHttps) return 'PROXY 127.0.0.1:4443'; return 'DIRECT'; }""" % { "origins": origins } pacURL = "".join(pacURL.splitlines()) part = """ user_pref("network.proxy.type", 2); user_pref("network.proxy.autoconfig_url", "%(pacURL)s"); user_pref("camino.use_system_proxy_settings", false); // Camino-only, harmless to others """ % {"pacURL": pacURL} prefs.append(part) # write the preferences prefsFile = open(profileDir + "/" + "user.js", "a") prefsFile.write("".join(prefs)) prefsFile.close() def fillCertificateDB(profileDir, certPath, utilityPath, xrePath): pwfilePath = os.path.join(profileDir, ".crtdbpw") pwfile = open(pwfilePath, "w") pwfile.write("\n") pwfile.close() # Create head of the ssltunnel configuration file sslTunnelConfigPath = os.path.join(profileDir, "ssltunnel.cfg") sslTunnelConfig = open(sslTunnelConfigPath, "w") sslTunnelConfig.write("httpproxy:1\n") sslTunnelConfig.write("certdbdir:%s\n" % certPath) sslTunnelConfig.write("forward:127.0.0.1:8888\n") sslTunnelConfig.write("listen:*:4443:pgo server certificate\n") # Configure automatic certificate and bind custom certificates, client authentication locations = readLocations() locations.pop(0) for loc in locations: if loc.scheme == "https" and "nocert" not in loc.options: customCertRE = re.compile("^cert=(?P[0-9a-zA-Z_ ]+)") clientAuthRE = re.compile("^clientauth=(?P[a-z]+)") for option in loc.options: match = customCertRE.match(option) if match: customcert = match.group("nickname"); sslTunnelConfig.write("listen:%s:%s:4443:%s\n" % (loc.host, loc.port, customcert)) match = clientAuthRE.match(option) if match: clientauth = match.group("clientauth"); sslTunnelConfig.write("clientauth:%s:%s:4443:%s\n" % (loc.host, loc.port, clientauth)) sslTunnelConfig.close() # Pre-create the certification database for the profile env = environment(xrePath = xrePath) certutil = os.path.join(utilityPath, "certutil" + BIN_SUFFIX) pk12util = os.path.join(utilityPath, "pk12util" + BIN_SUFFIX) status = Process([certutil, "-N", "-d", profileDir, "-f", pwfilePath], env = env).wait() if status != 0: return status # Walk the cert directory and add custom CAs and client certs files = os.listdir(certPath) for item in files: root, ext = os.path.splitext(item) if ext == ".ca": trustBits = "CT,," if root.endswith("-object"): trustBits = "CT,,CT" Process([certutil, "-A", "-i", os.path.join(certPath, item), "-d", profileDir, "-f", pwfilePath, "-n", root, "-t", trustBits], env = env).wait() if ext == ".client": Process([pk12util, "-i", os.path.join(certPath, item), "-w", pwfilePath, "-d", profileDir], env = env).wait() os.unlink(pwfilePath) return 0 def environment(env = None, xrePath = DIST_BIN): if env == None: env = dict(os.environ) ldLibraryPath = os.path.abspath(os.path.join(SCRIPT_DIR, xrePath)) if UNIXISH or IS_MAC: envVar = "LD_LIBRARY_PATH" if IS_MAC: envVar = "DYLD_LIBRARY_PATH" if envVar in env: ldLibraryPath = ldLibraryPath + ":" + env[envVar] env[envVar] = ldLibraryPath elif IS_WIN32: env["PATH"] = env["PATH"] + ";" + ldLibraryPath return env ############### # RUN THE APP # ############### def runApp(testURL, env, app, profileDir, extraArgs, runSSLTunnel = False, utilityPath = DIST_BIN, xrePath = DIST_BIN, certPath = CERTS_SRC_DIR): "Run the app, log the duration it took to execute, return the status code." if IS_TEST_BUILD and runSSLTunnel: # create certificate database for the profile certificateStatus = fillCertificateDB(profileDir, certPath, utilityPath, xrePath) if certificateStatus != 0: log.info("TEST-UNEXPECTED FAIL | (automation.py) | Certificate integration failed") return certificateStatus # start ssltunnel to provide https:// URLs capability ssltunnel = os.path.join(utilityPath, "ssltunnel" + BIN_SUFFIX) ssltunnelProcess = Process([ssltunnel, os.path.join(profileDir, "ssltunnel.cfg")], env = environment(xrePath = xrePath)) log.info("INFO | (automation.py) | SSL tunnel pid: %d", ssltunnelProcess.pid) # now run with the profile we created cmd = app if IS_MAC and not IS_CAMINO and not cmd.endswith("-bin"): cmd += "-bin" cmd = os.path.abspath(cmd) args = [] if IS_MAC: args.append("-foreground") if IS_CYGWIN: profileDirectory = commands.getoutput("cygpath -w \"" + profileDir + "/\"") else: profileDirectory = profileDir + "/" args.extend(("-no-remote", "-profile", profileDirectory)) if testURL is not None: if IS_CAMINO: args.extend(("-url", testURL)) else: args.append((testURL)) args.extend(extraArgs) startTime = datetime.now() proc = Process([cmd] + args, env = environment(env), stdout = subprocess.PIPE, stderr = subprocess.STDOUT) log.info("INFO | (automation.py) | Application pid: %d", proc.pid) line = proc.stdout.readline() while line != "": log.info(line.rstrip()) line = proc.stdout.readline() status = proc.wait() if status != 0: log.info("TEST-UNEXPECTED-FAIL | (automation.py) | Exited with code %d during test run", status) log.info("INFO | (automation.py) | Application ran for: %s", str(datetime.now() - startTime)) if IS_TEST_BUILD and runSSLTunnel: ssltunnelProcess.kill() return status