Propagating the success of a visit to the callback

This commit is contained in:
vringar 2020-05-08 13:48:16 +02:00
Родитель 672c5c128f
Коммит 1eb9490173
3 изменённых файлов: 26 добавлений и 15 удалений

Просмотреть файл

@ -25,7 +25,8 @@ class CommandSequence:
def __init__(self, url: str, reset: bool = False,
blocking: bool = False, retry_number: int = None,
site_rank: int = None, callback: Callable[[], None] = None):
site_rank: int = None,
callback: Callable[[bool], None] = None):
"""Initialize command sequence.
Parameters
@ -42,6 +43,12 @@ class CommandSequence:
site_rank : int, optional
Integer indicating the ranking of the page to visit, saved
to `site_visits`
callable :
A callback to be invoked once all data regarding this
CommandSequence has been saved out or it has been interrupted
The function will be passed a bool that's true if everything has
been successfully saved out if it's false something bad happened
and no or only incomplete data may have been saved out
"""
self.url = url
self.reset = reset
@ -157,9 +164,9 @@ class CommandSequence:
command = RunCustomFunctionCommand(function_handle, func_args)
self._commands_with_timeout.append((command, timeout))
def mark_done(self):
def mark_done(self, success: bool):
if self.callback is not None:
self.callback()
self.callback(success)
def get_commands_with_timeout(self) -> List[Tuple[BaseCommand, int]]:
""" Returns a list of all commands in the command_sequence

Просмотреть файл

@ -389,8 +389,8 @@ class TaskManager:
for visit_id, interrupted in visit_id_list:
self.logger.debug("Invoking callback of visit_id %d", visit_id)
cs = self.unsaved_command_sequences.pop(visit_id, None)
if cs and not interrupted:
cs.mark_done()
if cs:
cs.mark_done(not interrupted)
def _unpack_picked_error(self, pickled_error: bytes) -> Tuple[str, str]:
"""Unpacks `pickled_error` into and error `message` and `tb` string."""
@ -501,7 +501,7 @@ class TaskManager:
"visit_id": browser.curr_visit_id
}
self.sock.send(json.dumps(
[RECORD_TYPE_SPECIAL, interrupted_message]))
(RECORD_TYPE_SPECIAL, interrupted_message)))
if command_status == 'critical':
return
@ -630,7 +630,7 @@ class TaskManager:
command_sequence.reset = reset
self.execute_command_sequence(command_sequence, index=index)
def close(self, relaxed=True) -> None:
def close(self, relaxed: bool = True) -> None:
"""
Execute shutdown procedure for TaskManager
"""

Просмотреть файл

@ -6,7 +6,7 @@ import signal
import sys
import time
from threading import Lock
from typing import Callable
from typing import Any, Callable, List
import boto3
import sentry_sdk
@ -124,14 +124,15 @@ job_queue = rediswq.RedisWQ(
manager.logger.info("Worker with sessionID: %s" % job_queue.sessionID())
manager.logger.info("Initial queue state: empty=%s" % job_queue.empty())
unsaved_jobs = list()
unsaved_jobs: List[bytes] = list()
unsaved_jobs_lock = Lock()
shutting_down = False
def on_shutdown(manager, unsaved_jobs_lock):
def actual_callback(s: signal.Signals, __):
def on_shutdown(manager: TaskManager.TaskManager, unsaved_jobs_lock: Lock) \
-> Callable[[signal.Signals, Any], None]:
def actual_callback(s: signal.Signals, __: Any) -> None:
global shutting_down
manager.logger.error("Got interupted by %r, shutting down", s)
with unsaved_jobs_lock:
@ -149,11 +150,14 @@ for sig in [signal.SIGTERM, signal.SIGINT]:
def get_job_completion_callback(logger: logging.Logger,
unsaved_jobs_lock: Lock,
job_queue: rediswq.RedisWQ,
job: bytes) -> Callable[[], None]:
def callback() -> None:
job: bytes) -> Callable[[bool], None]:
def callback(sucess: bool) -> None:
with unsaved_jobs_lock:
logger.info("Job %r is done", job)
job_queue.complete(job)
if sucess:
logger.info("Job %r is done", job)
job_queue.complete(job)
else:
logger.warn("Job %r got interrupted", job)
unsaved_jobs.remove(job)
return callback