* Refactored BrowserManager into class

* Completing refactor

* Improved type annotations
This commit is contained in:
Stefan Zabka 2021-04-28 14:45:36 +02:00 коммит произвёл GitHub
Родитель 3619b55682
Коммит 262e4f2847
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 149 добавлений и 122 удалений

Просмотреть файл

@ -12,10 +12,10 @@ import time
import traceback
from pathlib import Path
from queue import Empty as EmptyQueue
from typing import TYPE_CHECKING, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
import psutil
from multiprocess import Queue
from multiprocess import Process, Queue
from selenium.common.exceptions import WebDriverException
from tblib import Traceback, pickling_support
@ -31,7 +31,6 @@ from .socket_interface import ClientSocket
from .storage.storage_providers import TableName
from .types import BrowserId, VisitId
from .utilities.multiprocess_utils import (
Process,
kill_process_and_children,
parse_traceback_for_sentry,
)
@ -103,11 +102,13 @@ class BrowserManagerHandle:
def set_visit_id(self, visit_id):
self.curr_visit_id = visit_id
def launch_browser_manager(self):
def launch_browser_manager(self) -> bool:
"""
sets up the BrowserManager and gets the process id, browser pid and,
if applicable, screen pid. loads associated user profile if necessary
"""
tempdir: Optional[str] = None
crash_recovery = False
# if this is restarting from a crash, update the tar location
# to be a tar of the crashed browser's history
if self.current_profile_path is not None:
@ -126,9 +127,6 @@ class BrowserManagerHandle:
self.browser_params.recovery_tar = tar_path
crash_recovery = True
else:
tempdir = None
crash_recovery = False
self.logger.info("BROWSER %i: Launching browser..." % self.browser_id)
self.is_fresh = not crash_recovery
@ -137,7 +135,8 @@ class BrowserManagerHandle:
unsuccessful_spawns = 0
success = False
def check_queue(launch_status):
def check_queue(launch_status: Dict[str, bool]) -> Any:
assert self.status_queue is not None
result = self.status_queue.get(True, self._SPAWN_TIMEOUT)
if result[0] == "STATUS":
launch_status[result[1]] = True
@ -156,19 +155,19 @@ class BrowserManagerHandle:
(self.command_queue, self.status_queue) = (Queue(), Queue())
# builds and launches the browser_manager
args = (
self.browser_manager = BrowserManager(
self.command_queue,
self.status_queue,
self.browser_params,
self.manager_params,
crash_recovery,
)
self.browser_manager = Process(target=BrowserManager, args=args)
self.browser_manager.daemon = True
self.browser_manager.start()
# Read success status of browser manager
launch_status = dict()
launch_status: Dict[str, bool] = dict()
try:
# 1. Browser profile created
browser_profile_path = check_queue(launch_status)
@ -618,9 +617,7 @@ class BrowserManagerHandle:
shutil.rmtree(self.current_profile_path, ignore_errors=True)
def BrowserManager(
command_queue, status_queue, browser_params, manager_params, crash_recovery
):
class BrowserManager(Process):
"""
The BrowserManager function runs in each new browser process.
It is responsible for listening to command instructions from
@ -628,120 +625,150 @@ def BrowserManager(
and interface with Selenium. Command execution status is sent back
to the TaskManager.
"""
logger = logging.getLogger("openwpm")
display = None
try:
# Start Xvfb (if necessary), webdriver, and browser
driver, browser_profile_path, display = deploy_firefox.deploy_firefox(
status_queue, browser_params, manager_params, crash_recovery
)
# Read the extension port -- if extension is enabled
# TODO: Initial communication from extension to TM should use sockets
if browser_params.extension_enabled:
logger.debug(
"BROWSER %i: Looking for extension port information "
"in %s" % (browser_params.browser_id, browser_profile_path)
)
elapsed = 0
port = None
ep_filename = browser_profile_path / "extension_port.txt"
while elapsed < 5:
try:
with open(ep_filename, "rt") as f:
port = int(f.read().strip())
break
except IOError as e:
if e.errno != errno.ENOENT:
raise
time.sleep(0.1)
elapsed += 0.1
if port is None:
# try one last time, allowing all exceptions to propagate
def __init__(
self,
command_queue: Queue,
status_queue: Queue,
browser_params: BrowserParamsInternal,
manager_params: ManagerParamsInternal,
crash_recovery: bool,
) -> None:
super().__init__()
self.logger = logging.getLogger("openwpm")
self.command_queue = command_queue
self.status_queue = status_queue
self.browser_params = browser_params
self.manager_params = manager_params
self.crash_recovery = crash_recovery
def _start_extension(self, browser_profile_path: Path) -> ClientSocket:
assert self.browser_params.browser_id is not None
self.logger.debug(
"BROWSER %i: Looking for extension port information "
"in %s" % (self.browser_params.browser_id, browser_profile_path)
)
elapsed = 0.0
port = None
ep_filename = browser_profile_path / "extension_port.txt"
while elapsed < 5:
try:
with open(ep_filename, "rt") as f:
port = int(f.read().strip())
break
except IOError as e:
if e.errno != errno.ENOENT:
raise
time.sleep(0.1)
elapsed += 0.1
if port is None:
# try one last time, allowing all exceptions to propagate
with open(ep_filename, "rt") as f:
port = int(f.read().strip())
logger.debug(
"BROWSER %i: Connecting to extension on port %i"
% (browser_params.browser_id, port)
)
extension_socket = ClientSocket(serialization="json")
extension_socket.connect("127.0.0.1", int(port))
else:
extension_socket = None
self.logger.debug(
"BROWSER %i: Connecting to extension on port %i"
% (self.browser_params.browser_id, port)
)
extension_socket = ClientSocket(serialization="json")
extension_socket.connect("127.0.0.1", int(port))
return extension_socket
logger.debug("BROWSER %i: BrowserManager ready." % browser_params.browser_id)
# passes "READY" to the TaskManager to signal a successful startup
status_queue.put(("STATUS", "Browser Ready", "READY"))
browser_params.profile_path = browser_profile_path
# starts accepting arguments until told to die
while True:
# no command for now -> sleep to avoid pegging CPU on blocking get
if command_queue.empty():
time.sleep(0.001)
continue
command: Union[ShutdownSignal, BaseCommand] = command_queue.get()
if type(command) is ShutdownSignal:
driver.quit()
status_queue.put("OK")
return
logger.info(
"BROWSER %i: EXECUTING COMMAND: %s"
% (browser_params.browser_id, str(command))
def run(self) -> None:
assert self.browser_params.browser_id is not None
display = None
try:
# Start Xvfb (if necessary), webdriver, and browser
driver, browser_profile_path, display = deploy_firefox.deploy_firefox(
self.status_queue,
self.browser_params,
self.manager_params,
self.crash_recovery,
)
# attempts to perform an action and return an OK signal
# if command fails for whatever reason, tell the TaskManager to
# kill and restart its worker processes
try:
command.execute(
driver,
browser_params,
manager_params,
extension_socket,
)
status_queue.put("OK")
except WebDriverException:
# We handle WebDriverExceptions separately here because they
# are quite common, and we often still have a handle to the
# browser, allowing us to run the SHUTDOWN command.
tb = traceback.format_exception(*sys.exc_info())
if "about:neterror" in tb[-1]:
status_queue.put(("NETERROR", pickle.dumps(sys.exc_info())))
extension_socket: Optional[ClientSocket] = None
if self.browser_params.extension_enabled:
extension_socket = self._start_extension(browser_profile_path)
self.logger.debug(
"BROWSER %i: BrowserManager ready." % self.browser_params.browser_id
)
# passes "READY" to the TaskManager to signal a successful startup
self.status_queue.put(("STATUS", "Browser Ready", "READY"))
self.browser_params.profile_path = browser_profile_path
assert extension_socket is not None
# starts accepting arguments until told to die
while True:
# no command for now -> sleep to avoid pegging CPU on blocking get
if self.command_queue.empty():
time.sleep(0.001)
continue
extra = parse_traceback_for_sentry(tb)
extra["exception"] = tb[-1]
logger.error(
"BROWSER %i: WebDriverException while executing command"
% browser_params.browser_id,
exc_info=True,
extra=extra,
)
status_queue.put(("FAILED", pickle.dumps(sys.exc_info())))
except (ProfileLoadError, BrowserConfigError, AssertionError) as e:
logger.error(
"BROWSER %i: %s thrown, informing parent and raising"
% (browser_params.browser_id, e.__class__.__name__)
)
status_queue.put(("CRITICAL", pickle.dumps(sys.exc_info())))
except Exception:
tb = traceback.format_exception(*sys.exc_info())
extra = parse_traceback_for_sentry(tb)
extra["exception"] = tb[-1]
logger.error(
"BROWSER %i: Crash in driver, restarting browser manager"
% browser_params.browser_id,
exc_info=True,
extra=extra,
)
status_queue.put(("FAILED", pickle.dumps(sys.exc_info())))
finally:
if display is not None:
display.stop()
return
command: Union[ShutdownSignal, BaseCommand] = self.command_queue.get()
if isinstance(command, ShutdownSignal):
driver.quit()
self.status_queue.put("OK")
return
assert isinstance(command, BaseCommand)
self.logger.info(
"BROWSER %i: EXECUTING COMMAND: %s"
% (self.browser_params.browser_id, str(command))
)
# attempts to perform an action and return an OK signal
# if command fails for whatever reason, tell the TaskManager to
# kill and restart its worker processes
try:
command.execute(
driver,
self.browser_params,
self.manager_params,
extension_socket,
)
self.status_queue.put("OK")
except WebDriverException:
# We handle WebDriverExceptions separately here because they
# are quite common, and we often still have a handle to the
# browser, allowing us to run the SHUTDOWN command.
tb = traceback.format_exception(*sys.exc_info())
if "about:neterror" in tb[-1]:
self.status_queue.put(
("NETERROR", pickle.dumps(sys.exc_info()))
)
continue
extra = parse_traceback_for_sentry(tb)
extra["exception"] = tb[-1]
self.logger.error(
"BROWSER %i: WebDriverException while executing command"
% self.browser_params.browser_id,
exc_info=True,
extra=extra,
)
self.status_queue.put(("FAILED", pickle.dumps(sys.exc_info())))
except (ProfileLoadError, BrowserConfigError, AssertionError) as e:
self.logger.error(
"BROWSER %i: %s thrown, informing parent and raising"
% (self.browser_params.browser_id, e.__class__.__name__)
)
self.status_queue.put(("CRITICAL", pickle.dumps(sys.exc_info())))
except Exception:
tb = traceback.format_exception(*sys.exc_info())
extra = parse_traceback_for_sentry(tb)
extra["exception"] = tb[-1]
self.logger.error(
"BROWSER %i: Crash in driver, restarting browser manager"
% self.browser_params.browser_id,
exc_info=True,
extra=extra,
)
self.status_queue.put(("FAILED", pickle.dumps(sys.exc_info())))
finally:
if display is not None:
display.stop()
return