This commit is contained in:
vringar 2020-03-06 17:19:28 +01:00
Родитель 261fa6f4e7
Коммит 3904ea95a7
1 изменённых файлов: 18 добавлений и 4 удалений

Просмотреть файл

@ -3,6 +3,8 @@ import json
import os
import time
from threading import Lock
import boto3
import sentry_sdk
@ -108,17 +110,30 @@ job_queue = rediswq.RedisWQ(
manager.logger.info("Worker with sessionID: %s" % job_queue.sessionID())
manager.logger.info("Initial queue state: empty=%s" % job_queue.empty())
unsaved_jobs = list()
unsaved_jobs_lock = Lock()
# Crawl sites specified in job queue until empty
while not job_queue.empty():
job_queue.check_expired_leases()
with(unsaved_jobs_lock):
for unsaved_job in unsaved_jobs:
if not job_queue.renew_lease(unsaved_job,
TIMEOUT + DWELL_TIME + 30):
manager.logger.error("Unsaved job: %s timed out", unsaved_job)
job = job_queue.lease(
lease_secs=TIMEOUT + DWELL_TIME + 30, block=True, timeout=5
)
if job is None:
manager.logger.info("Waiting for work")
time.sleep(5)
continue
unsaved_jobs.append(job)
def mark_job_as_done():
with(unsaved_jobs_lock):
job_queue.complete(job)
unsaved_jobs.remove(job)
retry_number = job_queue.get_retry_number(job)
site_rank, site = job.decode("utf-8").split(',')
@ -126,12 +141,11 @@ while not job_queue.empty():
site = "http://" + site
manager.logger.info("Visiting %s..." % site)
command_sequence = CommandSequence.CommandSequence(
site, blocking=True, reset=True, retry_number=retry_number
site, blocking=True, reset=True, retry_number=retry_number,
callback=mark_job_as_done
)
command_sequence.get(sleep=DWELL_TIME, timeout=TIMEOUT)
manager.execute_command_sequence(command_sequence)
job_queue.complete(job)
manager.logger.info("Job queue finished, exiting.")
manager.close()