зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1805251 - Add support for testing SmartBlock shims to the webcompat test framework; r=jgraham DONTBUILD
Differential Revision: https://phabricator.services.mozilla.com/D164500
This commit is contained in:
Родитель
804dfdf01a
Коммит
912cdfed57
|
@ -0,0 +1,350 @@
|
|||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import time
|
||||
|
||||
import webdriver
|
||||
|
||||
|
||||
class Client:
|
||||
def __init__(self, session, event_loop):
|
||||
self.session = session
|
||||
self.event_loop = event_loop
|
||||
self.content_blocker_loaded = False
|
||||
|
||||
@property
|
||||
def current_url(self):
|
||||
return self.session.url
|
||||
|
||||
@property
|
||||
def alert(self):
|
||||
return self.session.alert
|
||||
|
||||
@property
|
||||
def context(self):
|
||||
return self.session.send_session_command("GET", "moz/context")
|
||||
|
||||
@context.setter
|
||||
def context(self, context):
|
||||
self.session.send_session_command("POST", "moz/context", {"context": context})
|
||||
|
||||
@contextlib.contextmanager
|
||||
def using_context(self, context):
|
||||
orig_context = self.context
|
||||
needs_change = context != orig_context
|
||||
|
||||
if needs_change:
|
||||
self.context = context
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if needs_change:
|
||||
self.context = orig_context
|
||||
|
||||
def wait_for_content_blocker(self):
|
||||
if not self.content_blocker_loaded:
|
||||
with self.using_context("chrome"):
|
||||
self.session.execute_async_script(
|
||||
"""
|
||||
const done = arguments[0],
|
||||
signal = "safebrowsing-update-finished";
|
||||
function finish() {
|
||||
Services.obs.removeObserver(finish, signal);
|
||||
done();
|
||||
}
|
||||
Services.obs.addObserver(finish, signal);
|
||||
"""
|
||||
)
|
||||
self.content_blocker_loaded = True
|
||||
|
||||
@property
|
||||
def keyboard(self):
|
||||
return self.session.actions.sequence("key", "keyboard_id")
|
||||
|
||||
@property
|
||||
def mouse(self):
|
||||
return self.session.actions.sequence(
|
||||
"pointer", "pointer_id", {"pointerType": "mouse"}
|
||||
)
|
||||
|
||||
@property
|
||||
def pen(self):
|
||||
return self.session.actions.sequence(
|
||||
"pointer", "pointer_id", {"pointerType": "touch"}
|
||||
)
|
||||
|
||||
@property
|
||||
def touch(self):
|
||||
return self.session.actions.sequence(
|
||||
"pointer", "pointer_id", {"pointerType": "pen"}
|
||||
)
|
||||
|
||||
@property
|
||||
def wheel(self):
|
||||
return self.session.actions.sequence("wheel", "wheel_id")
|
||||
|
||||
@property
|
||||
def modifier_key(self):
|
||||
if self.session.capabilities["platformName"] == "mac":
|
||||
return "\ue03d" # meta (command)
|
||||
else:
|
||||
return "\ue009" # control
|
||||
|
||||
async def navigate(self, url, timeout=None, await_console_message=None):
|
||||
if timeout is not None:
|
||||
old_timeout = self.session.timeouts.page_load
|
||||
self.session.timeouts.page_load = timeout
|
||||
if self.session.test_config.get("use_pbm") or self.session.test_config.get(
|
||||
"use_strict_etp"
|
||||
):
|
||||
print("waiting for content blocker...")
|
||||
self.wait_for_content_blocker()
|
||||
if await_console_message is not None:
|
||||
console_message = self.promise_console_message(await_console_message)
|
||||
await self.session.bidi_session.session.subscribe(events=["log.entryAdded"])
|
||||
try:
|
||||
self.session.url = url
|
||||
except webdriver.error.TimeoutException as e:
|
||||
if timeout is None:
|
||||
raise e
|
||||
if await_console_message is not None:
|
||||
await console_message
|
||||
await self.session.bidi_session.session.unsubscribe(
|
||||
events=["log.entryAdded"]
|
||||
)
|
||||
if timeout is not None:
|
||||
self.session.timeouts.page_load = old_timeout
|
||||
|
||||
def back(self):
|
||||
self.session.back()
|
||||
|
||||
def switch_to_frame(self, frame):
|
||||
return self.session.transport.send(
|
||||
"POST",
|
||||
"session/{session_id}/frame".format(**vars(self.session)),
|
||||
{"id": frame},
|
||||
encoder=webdriver.protocol.Encoder,
|
||||
decoder=webdriver.protocol.Decoder,
|
||||
session=self.session,
|
||||
)
|
||||
|
||||
def switch_frame(self, frame):
|
||||
self.session.switch_frame(frame)
|
||||
|
||||
async def load_page_and_wait_for_iframe(
|
||||
self, url, finder, loads=1, timeout=None, **kwargs
|
||||
):
|
||||
while loads > 0:
|
||||
await self.navigate(url, **kwargs)
|
||||
frame = self.await_element(finder, timeout=timeout)
|
||||
loads -= 1
|
||||
self.switch_frame(frame)
|
||||
return frame
|
||||
|
||||
def promise_bidi_event(self, event_name: str, check_fn=None, timeout=5):
|
||||
future = self.event_loop.create_future()
|
||||
|
||||
async def on_event(method, data):
|
||||
print("on_event", method, data)
|
||||
if check_fn is not None and not check_fn(method, data):
|
||||
return
|
||||
remove_listener()
|
||||
future.set_result(data)
|
||||
|
||||
remove_listener = self.session.bidi_session.add_event_listener(
|
||||
event_name, on_event
|
||||
)
|
||||
return asyncio.wait_for(future, timeout=timeout)
|
||||
|
||||
def promise_console_message(self, msg):
|
||||
def check_messages(method, data):
|
||||
if "text" in data:
|
||||
if msg in data["text"]:
|
||||
return True
|
||||
if "args" in data and len(data["args"]) and "value" in data["args"][0]:
|
||||
if msg in data["args"][0]["value"]:
|
||||
return True
|
||||
|
||||
return self.promise_bidi_event("log.entryAdded", check_messages)
|
||||
|
||||
def execute_script(self, script, *args):
|
||||
return self.session.execute_script(script, args=args)
|
||||
|
||||
def execute_async_script(self, script, *args, **kwargs):
|
||||
return self.session.execute_async_script(script, args, **kwargs)
|
||||
|
||||
def clear_all_cookies(self):
|
||||
self.session.transport.send(
|
||||
"DELETE", "session/%s/cookie" % self.session.session_id
|
||||
)
|
||||
|
||||
def _do_is_displayed_check(self, ele, is_displayed):
|
||||
if ele is None:
|
||||
return None
|
||||
|
||||
if type(ele) in [list, tuple]:
|
||||
return [x for x in ele if self._do_is_displayed_check(x, is_displayed)]
|
||||
|
||||
if is_displayed is False and ele and self.is_displayed(ele):
|
||||
return None
|
||||
if is_displayed is True and ele and not self.is_displayed(ele):
|
||||
return None
|
||||
return ele
|
||||
|
||||
def find_css(self, *args, all=False, is_displayed=None, **kwargs):
|
||||
try:
|
||||
ele = self.session.find.css(*args, all=all, **kwargs)
|
||||
return self._do_is_displayed_check(ele, is_displayed)
|
||||
except webdriver.error.NoSuchElementException:
|
||||
return None
|
||||
|
||||
def find_xpath(self, xpath, all=False, is_displayed=None):
|
||||
route = "elements" if all else "element"
|
||||
body = {"using": "xpath", "value": xpath}
|
||||
try:
|
||||
ele = self.session.send_session_command("POST", route, body)
|
||||
return self._do_is_displayed_check(ele, is_displayed)
|
||||
except webdriver.error.NoSuchElementException:
|
||||
return None
|
||||
|
||||
def find_text(self, text, is_displayed=None, **kwargs):
|
||||
try:
|
||||
ele = self.find_xpath(f"//*[contains(text(),'{text}')]", **kwargs)
|
||||
return self._do_is_displayed_check(ele, is_displayed)
|
||||
except webdriver.error.NoSuchElementException:
|
||||
return None
|
||||
|
||||
def find_element(self, finder, is_displayed=None, **kwargs):
|
||||
ele = finder.find(self, **kwargs)
|
||||
return self._do_is_displayed_check(ele, is_displayed)
|
||||
|
||||
def await_css(self, selector, **kwargs):
|
||||
return self.await_element(self.css(selector), **kwargs)
|
||||
|
||||
def await_xpath(self, selector, **kwargs):
|
||||
return self.await_element(self.xpath(selector), **kwargs)
|
||||
|
||||
def await_text(self, selector, *args, **kwargs):
|
||||
return self.await_element(self.text(selector), **kwargs)
|
||||
|
||||
def await_element(self, finder, **kwargs):
|
||||
return self.await_first_element_of([finder], **kwargs)[0]
|
||||
|
||||
class css:
|
||||
def __init__(self, selector):
|
||||
self.selector = selector
|
||||
|
||||
def find(self, client, **kwargs):
|
||||
return client.find_css(self.selector, **kwargs)
|
||||
|
||||
class xpath:
|
||||
def __init__(self, selector):
|
||||
self.selector = selector
|
||||
|
||||
def find(self, client, **kwargs):
|
||||
return client.find_xpath(self.selector, **kwargs)
|
||||
|
||||
class text:
|
||||
def __init__(self, selector):
|
||||
self.selector = selector
|
||||
|
||||
def find(self, client, **kwargs):
|
||||
return client.find_text(self.selector, **kwargs)
|
||||
|
||||
def await_first_element_of(self, finders, timeout=None, delay=0.25, **kwargs):
|
||||
t0 = time.time()
|
||||
|
||||
if timeout is None:
|
||||
timeout = 10
|
||||
|
||||
found = [None for finder in finders]
|
||||
|
||||
exc = None
|
||||
while time.time() < t0 + timeout:
|
||||
for i, finder in enumerate(finders):
|
||||
try:
|
||||
result = finder.find(self, **kwargs)
|
||||
if result:
|
||||
found[i] = result
|
||||
return found
|
||||
except webdriver.error.NoSuchElementException as e:
|
||||
exc = e
|
||||
time.sleep(delay)
|
||||
raise exc if exc is not None else webdriver.error.NoSuchElementException
|
||||
return found
|
||||
|
||||
async def dom_ready(self, timeout=None):
|
||||
if timeout is None:
|
||||
timeout = 20
|
||||
|
||||
async def wait():
|
||||
return self.session.execute_async_script(
|
||||
"""
|
||||
const cb = arguments[0];
|
||||
setInterval(() => {
|
||||
if (document.readyState === "complete") {
|
||||
cb();
|
||||
}
|
||||
}, 500);
|
||||
"""
|
||||
)
|
||||
|
||||
task = asyncio.create_task(wait())
|
||||
return await asyncio.wait_for(task, timeout)
|
||||
|
||||
def is_float_cleared(self, elem1, elem2):
|
||||
return self.session.execute_script(
|
||||
"""return (function(a, b) {
|
||||
// Ensure that a is placed under b (and not to its right)
|
||||
return a?.offsetTop >= b?.offsetTop + b?.offsetHeight &&
|
||||
a?.offsetLeft < b?.offsetLeft + b?.offsetWidth;
|
||||
}(arguments[0], arguments[1]));""",
|
||||
elem1,
|
||||
elem2,
|
||||
)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def assert_getUserMedia_called(self):
|
||||
self.execute_script(
|
||||
"""
|
||||
navigator.mediaDevices.getUserMedia =
|
||||
navigator.mozGetUserMedia =
|
||||
navigator.getUserMedia =
|
||||
() => { window.__gumCalled = true; };
|
||||
"""
|
||||
)
|
||||
yield
|
||||
assert self.execute_script("return window.__gumCalled === true;")
|
||||
|
||||
def await_element_hidden(self, finder, timeout=None, delay=0.25):
|
||||
t0 = time.time()
|
||||
|
||||
if timeout is None:
|
||||
timeout = 20
|
||||
|
||||
elem = finder.find(self)
|
||||
while time.time() < t0 + timeout:
|
||||
try:
|
||||
if self.is_displayed(elem):
|
||||
time.sleep(delay)
|
||||
except webdriver.error.StaleElementReferenceException:
|
||||
return
|
||||
|
||||
def is_displayed(self, element):
|
||||
if element is None:
|
||||
return False
|
||||
|
||||
return self.session.execute_script(
|
||||
"""
|
||||
const e = arguments[0],
|
||||
s = window.getComputedStyle(e),
|
||||
v = s.visibility === "visible",
|
||||
o = Math.abs(parseFloat(s.opacity));
|
||||
return e.getClientRects().length && v && (isNaN(o) || o === 1.0);
|
||||
""",
|
||||
args=[element],
|
||||
)
|
|
@ -0,0 +1,191 @@
|
|||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
import webdriver
|
||||
|
||||
from client import Client
|
||||
|
||||
APS_PREF = "privacy.partition.always_partition_third_party_non_cookie_storage"
|
||||
CB_PBM_PREF = "network.cookie.cookieBehavior.pbmode"
|
||||
CB_PREF = "network.cookie.cookieBehavior"
|
||||
INJECTIONS_PREF = "extensions.webcompat.perform_injections"
|
||||
PBM_PREF = "browser.privatebrowsing.autostart"
|
||||
PIP_OVERRIDES_PREF = "extensions.webcompat.enable_picture_in_picture_overrides"
|
||||
SHIMS_PREF = "extensions.webcompat.enable_shims"
|
||||
STRICT_ETP_PREF = "privacy.trackingprotection.enabled"
|
||||
UA_OVERRIDES_PREF = "extensions.webcompat.perform_ua_overrides"
|
||||
|
||||
|
||||
class WebDriver:
|
||||
def __init__(self, config):
|
||||
self.browser_binary = config.getoption("browser_binary")
|
||||
self.webdriver_binary = config.getoption("webdriver_binary")
|
||||
self.port = config.getoption("webdriver_port")
|
||||
self.wsPort = config.getoption("webdriver_ws_port")
|
||||
self.headless = config.getoption("headless")
|
||||
self.proc = None
|
||||
|
||||
def command_line_driver(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def capabilities(self, test_config):
|
||||
raise NotImplementedError
|
||||
|
||||
def __enter__(self):
|
||||
assert self.proc is None
|
||||
self.proc = subprocess.Popen(self.command_line_driver())
|
||||
return self
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
self.proc.kill()
|
||||
|
||||
|
||||
class FirefoxWebDriver(WebDriver):
|
||||
def command_line_driver(self):
|
||||
return [
|
||||
self.webdriver_binary,
|
||||
"--port",
|
||||
str(self.port),
|
||||
"--websocket-port",
|
||||
str(self.wsPort),
|
||||
"-vv",
|
||||
]
|
||||
|
||||
def capabilities(self, test_config):
|
||||
prefs = {}
|
||||
|
||||
if "aps" in test_config:
|
||||
prefs[APS_PREF] = test_config["aps"]
|
||||
|
||||
if "use_interventions" in test_config:
|
||||
value = test_config["use_interventions"]
|
||||
prefs[INJECTIONS_PREF] = value
|
||||
prefs[UA_OVERRIDES_PREF] = value
|
||||
prefs[PIP_OVERRIDES_PREF] = value
|
||||
|
||||
if "use_pbm" in test_config:
|
||||
prefs[PBM_PREF] = test_config["use_pbm"]
|
||||
|
||||
if "use_shims" in test_config:
|
||||
prefs[SHIMS_PREF] = test_config["use_shims"]
|
||||
|
||||
if "use_strict_etp" in test_config:
|
||||
prefs[STRICT_ETP_PREF] = test_config["use_strict_etp"]
|
||||
|
||||
if "without_tcp" in test_config:
|
||||
cookieBehavior = 4 if test_config["without_tcp"] else 5
|
||||
prefs[CB_PREF] = cookieBehavior
|
||||
prefs[CB_PBM_PREF] = cookieBehavior
|
||||
|
||||
fx_options = {"binary": self.browser_binary, "prefs": prefs}
|
||||
|
||||
if self.headless:
|
||||
fx_options["args"] = ["--headless"]
|
||||
|
||||
return {
|
||||
"pageLoadStrategy": "normal",
|
||||
"moz:firefoxOptions": fx_options,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def should_do_2fa(request):
|
||||
return request.config.getoption("do2fa", False)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def config_file(request):
|
||||
path = request.config.getoption("config")
|
||||
if not path:
|
||||
return None
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bug_number(request):
|
||||
return re.findall("\d+", str(request.fspath.basename))[0]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def credentials(bug_number, config_file):
|
||||
if not config_file:
|
||||
pytest.skip(f"login info required for bug #{bug_number}")
|
||||
return None
|
||||
|
||||
try:
|
||||
credentials = config_file[bug_number]
|
||||
except KeyError:
|
||||
pytest.skip(f"no login for bug #{bug_number} found")
|
||||
return
|
||||
|
||||
return {"username": credentials["username"], "password": credentials["password"]}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def driver(pytestconfig):
|
||||
if pytestconfig.getoption("browser") == "firefox":
|
||||
cls = FirefoxWebDriver
|
||||
else:
|
||||
assert False
|
||||
|
||||
with cls(pytestconfig) as driver_instance:
|
||||
yield driver_instance
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def event_loop():
|
||||
return asyncio.get_event_loop_policy().new_event_loop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
async def client(session, event_loop):
|
||||
return Client(session, event_loop)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
async def session(driver, test_config):
|
||||
caps = driver.capabilities(test_config)
|
||||
caps.update(
|
||||
{
|
||||
"acceptInsecureCerts": True,
|
||||
"webSocketUrl": True,
|
||||
}
|
||||
)
|
||||
caps = {"alwaysMatch": caps}
|
||||
print(caps)
|
||||
|
||||
session = None
|
||||
for i in range(0, 15):
|
||||
try:
|
||||
if not session:
|
||||
session = webdriver.Session(
|
||||
"localhost", driver.port, capabilities=caps, enable_bidi=True
|
||||
)
|
||||
session.test_config = test_config
|
||||
session.start()
|
||||
break
|
||||
except ConnectionRefusedError:
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
await session.bidi_session.start()
|
||||
|
||||
yield session
|
||||
|
||||
await session.bidi_session.end()
|
||||
session.end()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def skip_platforms(bug_number, request, session):
|
||||
platform = session.capabilities["platformName"]
|
||||
if request.node.get_closest_marker("skip_platforms"):
|
||||
if request.node.get_closest_marker("skip_platforms").args[0] == platform:
|
||||
pytest.skip(f"Bug #{bug_number} skipped on platform ({platform})")
|
|
@ -2,12 +2,8 @@
|
|||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
from selenium.webdriver import Remote
|
||||
from ..fixtures import * # noqa: F403
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc):
|
||||
|
@ -18,132 +14,38 @@ def pytest_generate_tests(metafunc):
|
|||
|
||||
marks = [mark.name for mark in metafunc.function.pytestmark]
|
||||
|
||||
otherargs = {}
|
||||
argvalues = []
|
||||
ids = []
|
||||
|
||||
skip_platforms = []
|
||||
if "skip_platforms" in marks:
|
||||
for mark in metafunc.function.pytestmark:
|
||||
if mark.name == "skip_platforms":
|
||||
skip_platforms = mark.args
|
||||
otherargs["skip_platforms"] = mark.args
|
||||
|
||||
if "with_interventions" in marks:
|
||||
argvalues.append([{"interventions": True, "skip_platforms": skip_platforms}])
|
||||
argvalues.append([dict({"interventions": True}, **otherargs)])
|
||||
ids.append("with_interventions")
|
||||
|
||||
if "without_interventions" in marks:
|
||||
argvalues.append([{"interventions": False, "skip_platforms": skip_platforms}])
|
||||
argvalues.append([dict({"interventions": False}, **otherargs)])
|
||||
ids.append("without_interventions")
|
||||
|
||||
metafunc.parametrize(["session"], argvalues, ids=ids, indirect=True)
|
||||
|
||||
|
||||
class WebDriver:
|
||||
def __init__(self, config):
|
||||
self.browser_binary = config.getoption("browser_binary")
|
||||
self.webdriver_binary = config.getoption("webdriver_binary")
|
||||
self.port = config.getoption("webdriver_port")
|
||||
self.headless = config.getoption("headless")
|
||||
self.proc = None
|
||||
@pytest.fixture(scope="function") # noqa: F405
|
||||
async def test_config(request, driver):
|
||||
params = request.node.callspec.params.get("session")
|
||||
|
||||
def command_line_driver(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def capabilities(self, use_interventions):
|
||||
raise NotImplementedError
|
||||
|
||||
def __enter__(self):
|
||||
assert self.proc is None
|
||||
self.proc = subprocess.Popen(self.command_line_driver())
|
||||
return self
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
self.proc.kill()
|
||||
|
||||
|
||||
class FirefoxWebDriver(WebDriver):
|
||||
def command_line_driver(self):
|
||||
return [self.webdriver_binary, "--port", str(self.port), "-vv"]
|
||||
|
||||
def capabilities(self, use_interventions):
|
||||
fx_options = {"binary": self.browser_binary}
|
||||
|
||||
interventions_prefs = [
|
||||
"perform_injections",
|
||||
"perform_ua_overrides",
|
||||
"enable_shims",
|
||||
"enable_picture_in_picture_overrides",
|
||||
]
|
||||
fx_options["prefs"] = {
|
||||
f"extensions.webcompat.{pref}": use_interventions
|
||||
for pref in interventions_prefs
|
||||
}
|
||||
if self.headless:
|
||||
fx_options["args"] = ["--headless"]
|
||||
|
||||
return {
|
||||
"pageLoadStrategy": "normal",
|
||||
"moz:firefoxOptions": fx_options,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def config_file(request):
|
||||
path = request.config.getoption("config")
|
||||
if not path:
|
||||
return None
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def credentials(request, config_file):
|
||||
bug_number = re.findall("\d+", str(request.fspath.basename))[0]
|
||||
|
||||
if not config_file:
|
||||
pytest.skip(f"login info required for bug #{bug_number}")
|
||||
return None
|
||||
|
||||
try:
|
||||
credentials = config_file[bug_number]
|
||||
except KeyError:
|
||||
pytest.skip(f"no login for bug #{bug_number} found")
|
||||
return
|
||||
|
||||
return {"username": credentials["username"], "password": credentials["password"]}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def driver(pytestconfig):
|
||||
if pytestconfig.getoption("browser") == "firefox":
|
||||
cls = FirefoxWebDriver
|
||||
else:
|
||||
assert False
|
||||
|
||||
with cls(pytestconfig) as driver_instance:
|
||||
yield driver_instance
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def session(request, driver):
|
||||
use_interventions = request.param.get("interventions")
|
||||
use_interventions = params.get("interventions")
|
||||
print(f"use_interventions {use_interventions}")
|
||||
if use_interventions is None:
|
||||
raise ValueError(
|
||||
"Missing intervention marker in %s:%s"
|
||||
% (request.fspath, request.function.__name__)
|
||||
)
|
||||
capabilities = driver.capabilities(use_interventions)
|
||||
print(capabilities)
|
||||
|
||||
url = f"http://localhost:{driver.port}"
|
||||
with Remote(command_executor=url, desired_capabilities=capabilities) as session:
|
||||
yield session
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def skip_platforms(request, session):
|
||||
platform = session.capabilities["platformName"]
|
||||
if request.node.get_closest_marker("skip_platforms"):
|
||||
if request.node.get_closest_marker("skip_platforms").args[0] == platform:
|
||||
pytest.skip(f"Skipped on platform: {platform}")
|
||||
return {
|
||||
"use_interventions": use_interventions,
|
||||
}
|
||||
|
|
|
@ -1,144 +0,0 @@
|
|||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from selenium.common.exceptions import NoAlertPresentException, NoSuchElementException
|
||||
from selenium.webdriver.common.by import By
|
||||
|
||||
|
||||
def expect_alert(session, text=None):
|
||||
try:
|
||||
alert = session.switch_to.alert
|
||||
if text is not None:
|
||||
assert alert.text == text
|
||||
alert.dismiss()
|
||||
except NoAlertPresentException:
|
||||
pass
|
||||
|
||||
|
||||
def is_float_cleared(session, elem1, elem2):
|
||||
return session.execute_script(
|
||||
"""return (function(a, b) {
|
||||
// Ensure that a is placed under
|
||||
// (and not to the right of) b
|
||||
return a?.offsetTop >= b?.offsetTop + b?.offsetHeight &&
|
||||
a?.offsetLeft < b?.offsetLeft + b?.offsetWidth;
|
||||
}(arguments[0], arguments[1]));""",
|
||||
elem1,
|
||||
elem2,
|
||||
)
|
||||
|
||||
|
||||
def await_getUserMedia_call_on_click(session, elem_to_click):
|
||||
return session.execute_script(
|
||||
"""
|
||||
navigator.mediaDevices.getUserMedia =
|
||||
navigator.mozGetUserMedia =
|
||||
navigator.getUserMedia =
|
||||
() => { window.__gumCalled = true; };
|
||||
"""
|
||||
)
|
||||
elem_to_click.click()
|
||||
return session.execute_async_script(
|
||||
"""
|
||||
var done = arguments[0];
|
||||
setInterval(500, () => {
|
||||
if (window.__gumCalled === true) {
|
||||
done();
|
||||
}
|
||||
});
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
class Xpath:
|
||||
by = By.XPATH
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
|
||||
class Css:
|
||||
by = By.CSS_SELECTOR
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
|
||||
class Text:
|
||||
by = By.XPATH
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = f"//*[contains(text(),'{value}')]"
|
||||
|
||||
|
||||
Missing = object()
|
||||
|
||||
|
||||
def find_elements(session, selector, all=True, default=Missing):
|
||||
try:
|
||||
if all:
|
||||
return session.find_elements(selector.by, selector.value)
|
||||
return session.find_element(selector.by, selector.value)
|
||||
except NoSuchElementException:
|
||||
if default is not Missing:
|
||||
return default
|
||||
raise
|
||||
|
||||
|
||||
def find_element(session, selector, default=Missing):
|
||||
return find_elements(session, selector, all=False, default=default)
|
||||
|
||||
|
||||
def assert_not_element(session, sel):
|
||||
with pytest.raises(NoSuchElementException):
|
||||
find_element(session, sel)
|
||||
|
||||
|
||||
def await_first_element_of(session, selectors, timeout=None, is_displayed=False):
|
||||
t0 = time.time()
|
||||
exc = None
|
||||
if timeout is None:
|
||||
timeout = 10
|
||||
found = [None for sel in selectors]
|
||||
while time.time() < t0 + timeout:
|
||||
for i, selector in enumerate(selectors):
|
||||
try:
|
||||
ele = find_element(session, selector)
|
||||
if not is_displayed or ele.is_displayed():
|
||||
found[i] = ele
|
||||
return found
|
||||
except NoSuchElementException as e:
|
||||
exc = e
|
||||
time.sleep(0.5)
|
||||
raise exc if exc is not None else NoSuchElementException
|
||||
return found
|
||||
|
||||
|
||||
def await_element(session, selector, timeout=None, default=None):
|
||||
return await_first_element_of(session, [selector], timeout, default)[0]
|
||||
|
||||
|
||||
def load_page_and_wait_for_iframe(session, url, selector, loads=1, timeout=None):
|
||||
while loads > 0:
|
||||
session.get(url)
|
||||
frame = await_element(session, selector, timeout=timeout)
|
||||
loads -= 1
|
||||
session.switch_to.frame(frame)
|
||||
return frame
|
||||
|
||||
|
||||
def await_dom_ready(session):
|
||||
session.execute_async_script(
|
||||
"""
|
||||
const cb = arguments[0];
|
||||
setInterval(() => {
|
||||
if (document.readyState === "complete") {
|
||||
cb();
|
||||
}
|
||||
}, 500);
|
||||
"""
|
||||
)
|
|
@ -42,7 +42,25 @@ def create_parser_interventions():
|
|||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--binary", help="Path to browser binary")
|
||||
parser.add_argument("--webdriver-binary", help="Path to webdriver binary")
|
||||
parser.add_argument(
|
||||
"--webdriver-port",
|
||||
action="store",
|
||||
default="4444",
|
||||
help="Port on which to run WebDriver",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--webdriver-ws-port",
|
||||
action="store",
|
||||
default="9222",
|
||||
help="Port on which to run WebDriver BiDi websocket",
|
||||
)
|
||||
parser.add_argument("--bug", help="Bug to run tests for")
|
||||
parser.add_argument(
|
||||
"--do2fa",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Do two-factor auth live in supporting tests",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config", help="Path to JSON file containing logins and other settings"
|
||||
)
|
||||
|
@ -59,9 +77,16 @@ def create_parser_interventions():
|
|||
"--interventions",
|
||||
action="store",
|
||||
default="both",
|
||||
choices=["enabled", "disabled", "both"],
|
||||
choices=["enabled", "disabled", "both", "none"],
|
||||
help="Enable webcompat interventions",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--shims",
|
||||
action="store",
|
||||
default="none",
|
||||
choices=["enabled", "disabled", "both", "none"],
|
||||
help="Enable SmartBlock shims",
|
||||
)
|
||||
commandline.add_logging_group(parser)
|
||||
return parser
|
||||
|
||||
|
@ -149,25 +174,52 @@ class InterventionTest(MozbuildObject):
|
|||
|
||||
self.set_default_kwargs(logger, kwargs)
|
||||
|
||||
interventions = (
|
||||
["enabled", "disabled"]
|
||||
if kwargs["interventions"] == "both"
|
||||
else [kwargs["interventions"]]
|
||||
)
|
||||
|
||||
for interventions_setting in interventions:
|
||||
runner.run(
|
||||
logger,
|
||||
os.path.join(here, "interventions"),
|
||||
kwargs["binary"],
|
||||
kwargs["webdriver_binary"],
|
||||
bug=kwargs["bug"],
|
||||
debug=kwargs["debug"],
|
||||
interventions=interventions_setting,
|
||||
config=kwargs["config"],
|
||||
headless=kwargs["headless"],
|
||||
if kwargs["interventions"] != "none":
|
||||
interventions = (
|
||||
["enabled", "disabled"]
|
||||
if kwargs["interventions"] == "both"
|
||||
else [kwargs["interventions"]]
|
||||
)
|
||||
|
||||
for interventions_setting in interventions:
|
||||
runner.run(
|
||||
logger,
|
||||
os.path.join(here, "interventions"),
|
||||
kwargs["binary"],
|
||||
kwargs["webdriver_binary"],
|
||||
kwargs["webdriver_port"],
|
||||
kwargs["webdriver_ws_port"],
|
||||
bug=kwargs["bug"],
|
||||
debug=kwargs["debug"],
|
||||
interventions=interventions_setting,
|
||||
config=kwargs["config"],
|
||||
headless=kwargs["headless"],
|
||||
do2fa=kwargs["do2fa"],
|
||||
)
|
||||
|
||||
if kwargs["shims"] != "none":
|
||||
shims = (
|
||||
["enabled", "disabled"]
|
||||
if kwargs["shims"] == "both"
|
||||
else [kwargs["shims"]]
|
||||
)
|
||||
|
||||
for shims_setting in shims:
|
||||
runner.run(
|
||||
logger,
|
||||
os.path.join(here, "shims"),
|
||||
kwargs["binary"],
|
||||
kwargs["webdriver_binary"],
|
||||
kwargs["webdriver_port"],
|
||||
kwargs["webdriver_ws_port"],
|
||||
bug=kwargs["bug"],
|
||||
debug=kwargs["debug"],
|
||||
shims=shims_setting,
|
||||
config=kwargs["config"],
|
||||
headless=kwargs["headless"],
|
||||
do2fa=kwargs["do2fa"],
|
||||
)
|
||||
|
||||
summary = status_handler.summarize()
|
||||
passed = (
|
||||
summary.unexpected_statuses == 0
|
||||
|
@ -185,5 +237,11 @@ class InterventionTest(MozbuildObject):
|
|||
virtualenv_name="webcompat",
|
||||
)
|
||||
def test_interventions(command_context, **params):
|
||||
here = os.path.abspath(os.path.dirname(__file__))
|
||||
command_context.virtualenv_manager.activate()
|
||||
command_context.virtualenv_manager.install_pip_requirements(
|
||||
os.path.join(here, "requirements.txt"),
|
||||
require_hashes=False,
|
||||
)
|
||||
intervention_test = command_context._spawn(InterventionTest)
|
||||
return 0 if intervention_test.run(**params) else 1
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
pytest==4.6.6
|
||||
selenium==3.141.0
|
||||
asyncio==3.4.3
|
||||
pytest-asyncio==0.16.0
|
||||
urllib3==1.26
|
||||
|
|
|
@ -15,12 +15,16 @@ def run(
|
|||
path,
|
||||
browser_binary,
|
||||
webdriver_binary,
|
||||
webdriver_port,
|
||||
webdriver_ws_port,
|
||||
environ=None,
|
||||
bug=None,
|
||||
debug=False,
|
||||
interventions=None,
|
||||
shims=None,
|
||||
config=None,
|
||||
headless=False,
|
||||
do2fa=False,
|
||||
):
|
||||
""""""
|
||||
old_environ = os.environ.copy()
|
||||
|
@ -53,6 +57,10 @@ def run(
|
|||
browser_binary,
|
||||
"--webdriver-binary",
|
||||
webdriver_binary,
|
||||
"--webdriver-port",
|
||||
webdriver_port,
|
||||
"--webdriver-ws-port",
|
||||
webdriver_ws_port,
|
||||
]
|
||||
|
||||
if debug:
|
||||
|
@ -61,25 +69,50 @@ def run(
|
|||
if headless:
|
||||
args.append("--headless")
|
||||
|
||||
if bug:
|
||||
args.append("--bug")
|
||||
args.append(bug)
|
||||
|
||||
if do2fa:
|
||||
args.append("--do2fa")
|
||||
|
||||
if config:
|
||||
args.append("--config")
|
||||
args.append(config)
|
||||
|
||||
if interventions is not None and shims is not None:
|
||||
raise ValueError(
|
||||
"Must provide only one of interventions or shims argument"
|
||||
)
|
||||
elif interventions is None and shims is None:
|
||||
raise ValueError(
|
||||
"Must provide either an interventions or shims argument"
|
||||
)
|
||||
|
||||
name = "webcompat-interventions"
|
||||
if interventions == "enabled":
|
||||
args.extend(["-m", "with_interventions"])
|
||||
elif interventions == "disabled":
|
||||
args.extend(["-m", "without_interventions"])
|
||||
elif interventions is not None:
|
||||
raise ValueError(f"Invalid value for interventions {interventions}")
|
||||
if shims == "enabled":
|
||||
args.extend(["-m", "with_shims"])
|
||||
name = "smartblock-shims"
|
||||
elif shims == "disabled":
|
||||
args.extend(["-m", "without_shims"])
|
||||
name = "smartblock-shims"
|
||||
elif shims is not None:
|
||||
raise ValueError(f"Invalid value for shims {shims}")
|
||||
else:
|
||||
raise ValueError("Must provide interventions argument")
|
||||
name = "smartblock-shims"
|
||||
|
||||
if bug is not None:
|
||||
args.extend(["-k", bug])
|
||||
|
||||
args.append(path)
|
||||
try:
|
||||
logger.suite_start([], name="webcompat-interventions")
|
||||
logger.suite_start([], name=name)
|
||||
pytest.main(args, plugins=[config_plugin, result_recorder])
|
||||
except Exception as e:
|
||||
logger.critical(str(e))
|
||||
|
@ -101,13 +134,25 @@ class WDConfig:
|
|||
parser.addoption(
|
||||
"--webdriver-port",
|
||||
action="store",
|
||||
default=4444,
|
||||
default="4444",
|
||||
help="Port on which to run WebDriver",
|
||||
)
|
||||
parser.addoption(
|
||||
"--webdriver-ws-port",
|
||||
action="store",
|
||||
default="9222",
|
||||
help="Port on which to run WebDriver BiDi websocket",
|
||||
)
|
||||
parser.addoption(
|
||||
"--browser", action="store", choices=["firefox"], help="Name of the browser"
|
||||
)
|
||||
parser.addoption("--bug", action="store", help="Bug number to run tests for")
|
||||
parser.addoption(
|
||||
"--do2fa",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Do two-factor auth live in supporting tests",
|
||||
)
|
||||
parser.addoption(
|
||||
"--config",
|
||||
action="store",
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
|
||||
from ..fixtures import * # noqa: F403
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc):
|
||||
"""Generate tests based on markers."""
|
||||
|
||||
if "session" not in metafunc.fixturenames:
|
||||
return
|
||||
|
||||
marks = [mark.name for mark in metafunc.function.pytestmark]
|
||||
|
||||
otherargs = {}
|
||||
argvalues = []
|
||||
ids = []
|
||||
|
||||
if "skip_platforms" in marks:
|
||||
for mark in metafunc.function.pytestmark:
|
||||
if mark.name == "skip_platforms":
|
||||
otherargs["skip_platforms"] = mark.args
|
||||
|
||||
if "with_private_browsing" in marks:
|
||||
otherargs["with_private_browsing"] = True
|
||||
if "with_strict_etp" in marks:
|
||||
otherargs["with_strict_etp"] = True
|
||||
if "without_storage_partitioning" in marks:
|
||||
otherargs["without_storage_partitioning"] = True
|
||||
if "without_tcp " in marks:
|
||||
otherargs["without_tcp "] = True
|
||||
|
||||
if "with_shims" in marks:
|
||||
argvalues.append([dict({"shims": True}, **otherargs)])
|
||||
ids.append("with_shims")
|
||||
|
||||
if "without_shims" in marks:
|
||||
argvalues.append([dict({"shims": False}, **otherargs)])
|
||||
ids.append("without_shims")
|
||||
|
||||
metafunc.parametrize(["session"], argvalues, ids=ids, indirect=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function") # noqa: F405
|
||||
async def test_config(request, driver):
|
||||
params = request.node.callspec.params.get("session")
|
||||
|
||||
use_shims = params.get("shims")
|
||||
if use_shims is None:
|
||||
raise ValueError(
|
||||
"Missing shims marker in %s:%s"
|
||||
% (request.fspath, request.function.__name__)
|
||||
)
|
||||
|
||||
return {
|
||||
"aps": not params.get("without_storage_partitioning", False),
|
||||
"use_pbm": params.get("with_private_browsing", False),
|
||||
"use_shims": use_shims,
|
||||
"use_strict_etp": params.get("with_strict_etp", False),
|
||||
"without_tcp": params.get("without_tcp", False),
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
[pytest]
|
||||
console_output_style = classic
|
||||
markers =
|
||||
skip_platforms: skip tests on specific platforms (mac, linux, windows)
|
||||
with_shims: enable web-compat shims
|
||||
without_shims: disable web-compat shims
|
||||
without_storage_partitioning: disable partitioning of non-cookie third-party web storage
|
||||
with_private_browsing: run test in a private browsing window
|
||||
with_strict_etp: enable strict ETP mode
|
||||
without_tcp: disable Total Cookie Protection
|
|
@ -0,0 +1,42 @@
|
|||
GPT_SHIM_MSG = "Google Publisher Tags is being shimmed by Firefox"
|
||||
GAGTA_SHIM_MSG = "Google Analytics and Tag Manager is being shimmed by Firefox"
|
||||
|
||||
|
||||
async def is_gtag_placeholder_displayed(helper, url, finder, **kwargs):
|
||||
await helper.navigate(url, **kwargs)
|
||||
await helper.await_element(finder)
|
||||
helper.execute_async_script(
|
||||
"""
|
||||
const done = arguments[0];
|
||||
if (window.dataLayer?.push?.toString() === [].push.toString()) {
|
||||
return done();
|
||||
}
|
||||
setTimeout(() => {
|
||||
dataLayer.push({
|
||||
event: "datalayerReady",
|
||||
eventTimeout: 1,
|
||||
eventCallback: done,
|
||||
});
|
||||
}, 100);
|
||||
"""
|
||||
)
|
||||
return helper.find_element(finder).is_displayed()
|
||||
|
||||
|
||||
async def clicking_link_navigates(helper, url, finder, **kwargs):
|
||||
await helper.navigate(url, **kwargs)
|
||||
elem = await helper.await_element(finder)
|
||||
return helper.session.execute_async_script(
|
||||
"""
|
||||
const elem = arguments[0],
|
||||
done = arguments[1];
|
||||
window.onbeforeunload = function() {
|
||||
done(true);
|
||||
};
|
||||
elem.click();
|
||||
setTimeout(() => {
|
||||
done(false);
|
||||
}, 1000);
|
||||
""",
|
||||
args=[elem],
|
||||
)
|
|
@ -36,6 +36,7 @@ license:
|
|||
- mobile/android/geckoview/src/main/res/drawable/ic_generic_file.xml
|
||||
- mobile/android/geckoview_example/src/main
|
||||
- testing/webcompat/interventions/
|
||||
- testing/webcompat/shims/
|
||||
# might not work with license
|
||||
- mobile/android/gradle/dotgradle-offline/gradle.properties
|
||||
# might not work with license
|
||||
|
|
Загрузка…
Ссылка в новой задаче