From 262e4f28479ea5a393f4ae2bf2c1bd0748e7441f Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Wed, 28 Apr 2021 14:45:36 +0200 Subject: [PATCH] Browser manager as class (#901) * Refactored BrowserManager into class * Completing refactor * Improved type annotations --- openwpm/browser_manager.py | 271 ++++++++++++++++++++----------------- 1 file changed, 149 insertions(+), 122 deletions(-) diff --git a/openwpm/browser_manager.py b/openwpm/browser_manager.py index c90b7c73..7755ea2d 100644 --- a/openwpm/browser_manager.py +++ b/openwpm/browser_manager.py @@ -12,10 +12,10 @@ import time import traceback from pathlib import Path from queue import Empty as EmptyQueue -from typing import TYPE_CHECKING, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union import psutil -from multiprocess import Queue +from multiprocess import Process, Queue from selenium.common.exceptions import WebDriverException from tblib import Traceback, pickling_support @@ -31,7 +31,6 @@ from .socket_interface import ClientSocket from .storage.storage_providers import TableName from .types import BrowserId, VisitId from .utilities.multiprocess_utils import ( - Process, kill_process_and_children, parse_traceback_for_sentry, ) @@ -103,11 +102,13 @@ class BrowserManagerHandle: def set_visit_id(self, visit_id): self.curr_visit_id = visit_id - def launch_browser_manager(self): + def launch_browser_manager(self) -> bool: """ sets up the BrowserManager and gets the process id, browser pid and, if applicable, screen pid. loads associated user profile if necessary """ + tempdir: Optional[str] = None + crash_recovery = False # if this is restarting from a crash, update the tar location # to be a tar of the crashed browser's history if self.current_profile_path is not None: @@ -126,9 +127,6 @@ class BrowserManagerHandle: self.browser_params.recovery_tar = tar_path crash_recovery = True - else: - tempdir = None - crash_recovery = False self.logger.info("BROWSER %i: Launching browser..." % self.browser_id) self.is_fresh = not crash_recovery @@ -137,7 +135,8 @@ class BrowserManagerHandle: unsuccessful_spawns = 0 success = False - def check_queue(launch_status): + def check_queue(launch_status: Dict[str, bool]) -> Any: + assert self.status_queue is not None result = self.status_queue.get(True, self._SPAWN_TIMEOUT) if result[0] == "STATUS": launch_status[result[1]] = True @@ -156,19 +155,19 @@ class BrowserManagerHandle: (self.command_queue, self.status_queue) = (Queue(), Queue()) # builds and launches the browser_manager - args = ( + + self.browser_manager = BrowserManager( self.command_queue, self.status_queue, self.browser_params, self.manager_params, crash_recovery, ) - self.browser_manager = Process(target=BrowserManager, args=args) self.browser_manager.daemon = True self.browser_manager.start() # Read success status of browser manager - launch_status = dict() + launch_status: Dict[str, bool] = dict() try: # 1. Browser profile created browser_profile_path = check_queue(launch_status) @@ -618,9 +617,7 @@ class BrowserManagerHandle: shutil.rmtree(self.current_profile_path, ignore_errors=True) -def BrowserManager( - command_queue, status_queue, browser_params, manager_params, crash_recovery -): +class BrowserManager(Process): """ The BrowserManager function runs in each new browser process. It is responsible for listening to command instructions from @@ -628,120 +625,150 @@ def BrowserManager( and interface with Selenium. Command execution status is sent back to the TaskManager. """ - logger = logging.getLogger("openwpm") - display = None - try: - # Start Xvfb (if necessary), webdriver, and browser - driver, browser_profile_path, display = deploy_firefox.deploy_firefox( - status_queue, browser_params, manager_params, crash_recovery - ) - # Read the extension port -- if extension is enabled - # TODO: Initial communication from extension to TM should use sockets - if browser_params.extension_enabled: - logger.debug( - "BROWSER %i: Looking for extension port information " - "in %s" % (browser_params.browser_id, browser_profile_path) - ) - elapsed = 0 - port = None - ep_filename = browser_profile_path / "extension_port.txt" - while elapsed < 5: - try: - with open(ep_filename, "rt") as f: - port = int(f.read().strip()) - break - except IOError as e: - if e.errno != errno.ENOENT: - raise - time.sleep(0.1) - elapsed += 0.1 - if port is None: - # try one last time, allowing all exceptions to propagate + def __init__( + self, + command_queue: Queue, + status_queue: Queue, + browser_params: BrowserParamsInternal, + manager_params: ManagerParamsInternal, + crash_recovery: bool, + ) -> None: + super().__init__() + self.logger = logging.getLogger("openwpm") + self.command_queue = command_queue + self.status_queue = status_queue + self.browser_params = browser_params + self.manager_params = manager_params + self.crash_recovery = crash_recovery + + def _start_extension(self, browser_profile_path: Path) -> ClientSocket: + assert self.browser_params.browser_id is not None + self.logger.debug( + "BROWSER %i: Looking for extension port information " + "in %s" % (self.browser_params.browser_id, browser_profile_path) + ) + elapsed = 0.0 + port = None + ep_filename = browser_profile_path / "extension_port.txt" + while elapsed < 5: + try: with open(ep_filename, "rt") as f: port = int(f.read().strip()) + break + except IOError as e: + if e.errno != errno.ENOENT: + raise + time.sleep(0.1) + elapsed += 0.1 + if port is None: + # try one last time, allowing all exceptions to propagate + with open(ep_filename, "rt") as f: + port = int(f.read().strip()) - logger.debug( - "BROWSER %i: Connecting to extension on port %i" - % (browser_params.browser_id, port) - ) - extension_socket = ClientSocket(serialization="json") - extension_socket.connect("127.0.0.1", int(port)) - else: - extension_socket = None + self.logger.debug( + "BROWSER %i: Connecting to extension on port %i" + % (self.browser_params.browser_id, port) + ) + extension_socket = ClientSocket(serialization="json") + extension_socket.connect("127.0.0.1", int(port)) + return extension_socket - logger.debug("BROWSER %i: BrowserManager ready." % browser_params.browser_id) - - # passes "READY" to the TaskManager to signal a successful startup - status_queue.put(("STATUS", "Browser Ready", "READY")) - browser_params.profile_path = browser_profile_path - - # starts accepting arguments until told to die - while True: - # no command for now -> sleep to avoid pegging CPU on blocking get - if command_queue.empty(): - time.sleep(0.001) - continue - - command: Union[ShutdownSignal, BaseCommand] = command_queue.get() - - if type(command) is ShutdownSignal: - driver.quit() - status_queue.put("OK") - return - - logger.info( - "BROWSER %i: EXECUTING COMMAND: %s" - % (browser_params.browser_id, str(command)) + def run(self) -> None: + assert self.browser_params.browser_id is not None + display = None + try: + # Start Xvfb (if necessary), webdriver, and browser + driver, browser_profile_path, display = deploy_firefox.deploy_firefox( + self.status_queue, + self.browser_params, + self.manager_params, + self.crash_recovery, ) - # attempts to perform an action and return an OK signal - # if command fails for whatever reason, tell the TaskManager to - # kill and restart its worker processes - try: - command.execute( - driver, - browser_params, - manager_params, - extension_socket, - ) - status_queue.put("OK") - except WebDriverException: - # We handle WebDriverExceptions separately here because they - # are quite common, and we often still have a handle to the - # browser, allowing us to run the SHUTDOWN command. - tb = traceback.format_exception(*sys.exc_info()) - if "about:neterror" in tb[-1]: - status_queue.put(("NETERROR", pickle.dumps(sys.exc_info()))) + extension_socket: Optional[ClientSocket] = None + + if self.browser_params.extension_enabled: + extension_socket = self._start_extension(browser_profile_path) + + self.logger.debug( + "BROWSER %i: BrowserManager ready." % self.browser_params.browser_id + ) + + # passes "READY" to the TaskManager to signal a successful startup + self.status_queue.put(("STATUS", "Browser Ready", "READY")) + self.browser_params.profile_path = browser_profile_path + + assert extension_socket is not None + # starts accepting arguments until told to die + while True: + # no command for now -> sleep to avoid pegging CPU on blocking get + if self.command_queue.empty(): + time.sleep(0.001) continue - extra = parse_traceback_for_sentry(tb) - extra["exception"] = tb[-1] - logger.error( - "BROWSER %i: WebDriverException while executing command" - % browser_params.browser_id, - exc_info=True, - extra=extra, - ) - status_queue.put(("FAILED", pickle.dumps(sys.exc_info()))) - except (ProfileLoadError, BrowserConfigError, AssertionError) as e: - logger.error( - "BROWSER %i: %s thrown, informing parent and raising" - % (browser_params.browser_id, e.__class__.__name__) - ) - status_queue.put(("CRITICAL", pickle.dumps(sys.exc_info()))) - except Exception: - tb = traceback.format_exception(*sys.exc_info()) - extra = parse_traceback_for_sentry(tb) - extra["exception"] = tb[-1] - logger.error( - "BROWSER %i: Crash in driver, restarting browser manager" - % browser_params.browser_id, - exc_info=True, - extra=extra, - ) - status_queue.put(("FAILED", pickle.dumps(sys.exc_info()))) - finally: - if display is not None: - display.stop() - return + command: Union[ShutdownSignal, BaseCommand] = self.command_queue.get() + + if isinstance(command, ShutdownSignal): + driver.quit() + self.status_queue.put("OK") + return + + assert isinstance(command, BaseCommand) + self.logger.info( + "BROWSER %i: EXECUTING COMMAND: %s" + % (self.browser_params.browser_id, str(command)) + ) + + # attempts to perform an action and return an OK signal + # if command fails for whatever reason, tell the TaskManager to + # kill and restart its worker processes + try: + command.execute( + driver, + self.browser_params, + self.manager_params, + extension_socket, + ) + self.status_queue.put("OK") + except WebDriverException: + # We handle WebDriverExceptions separately here because they + # are quite common, and we often still have a handle to the + # browser, allowing us to run the SHUTDOWN command. + tb = traceback.format_exception(*sys.exc_info()) + if "about:neterror" in tb[-1]: + self.status_queue.put( + ("NETERROR", pickle.dumps(sys.exc_info())) + ) + continue + extra = parse_traceback_for_sentry(tb) + extra["exception"] = tb[-1] + self.logger.error( + "BROWSER %i: WebDriverException while executing command" + % self.browser_params.browser_id, + exc_info=True, + extra=extra, + ) + self.status_queue.put(("FAILED", pickle.dumps(sys.exc_info()))) + + except (ProfileLoadError, BrowserConfigError, AssertionError) as e: + self.logger.error( + "BROWSER %i: %s thrown, informing parent and raising" + % (self.browser_params.browser_id, e.__class__.__name__) + ) + self.status_queue.put(("CRITICAL", pickle.dumps(sys.exc_info()))) + except Exception: + tb = traceback.format_exception(*sys.exc_info()) + extra = parse_traceback_for_sentry(tb) + extra["exception"] = tb[-1] + self.logger.error( + "BROWSER %i: Crash in driver, restarting browser manager" + % self.browser_params.browser_id, + exc_info=True, + extra=extra, + ) + self.status_queue.put(("FAILED", pickle.dumps(sys.exc_info()))) + finally: + if display is not None: + display.stop() + return