This commit is contained in:
Stefan Zabka 2020-02-28 17:39:39 +01:00
Родитель 86a7d575ba
Коммит 4e418b37b5
8 изменённых файлов: 45 добавлений и 27 удалений

Просмотреть файл

@ -171,5 +171,6 @@ class CommandSequence:
self.commands_with_timeout.append((command, timeout))
def markDone(self):
print("Callback invoked")
if self.callback is not None:
self.callback()

Просмотреть файл

@ -1,4 +1,5 @@
import abc
import json
import logging
import queue
import threading
@ -51,6 +52,7 @@ class BaseListener(object):
self._last_update = time.time() # last status update time
self.record_queue = None # Initialized on `startup`
self.logger = logging.getLogger('openwpm')
self.in_progress_map = dict() # maps crawl_id to visit_id
@abc.abstractmethod
def process_record(self, record):
@ -102,9 +104,42 @@ class BaseListener(object):
)
self._last_update = time.time()
def update_records(self, table: str, data: Dict[str, Any]):
"""A method to keep track of which browser is working on which visit_id
Some data should contain a visit_id and a crawl_id, but the method
handles both being not set
"""
visit_id = None
crawl_id = None
# All data records should be keyed by the crawler and site visit
try:
visit_id = data['visit_id']
except KeyError:
self.logger.error("Record for table %s has no visit id" % table)
self.logger.error(json.dumps(data))
pass
try:
crawl_id = data['crawl_id']
except KeyError:
self.logger.error("Record for table %s has no crawl id" % table)
self.logger.error(json.dumps(data))
pass
if crawl_id is not None and visit_id is not None:
# Check if the browser for this record has moved on to a new visit
if crawl_id not in self.in_progress_map:
self.in_progress_map[crawl_id] = visit_id
elif self.in_progress_map[crawl_id] != visit_id:
self.mark_visit_id_done(self.in_progress_map[crawl_id])
self.in_progress_map[crawl_id] = visit_id
def mark_visit_id_done(self, visit_id: int):
""" This function should be called to indicate that all records
relating to a certain visit_id have been saved"""
self.logger.debug("Putting visit_id {0} into queue".format(visit_id))
self.completion_queue.put(visit_id)
def shutdown(self):
@ -112,6 +147,8 @@ class BaseListener(object):
Note: Child classes should call this method"""
self.sock.close()
for visit_id in self.in_progress_map.values():
self.mark_visit_id_done(visit_id)
def drain_queue(self):
""" Ensures queue is empty before closing """
@ -200,7 +237,7 @@ class BaseAggregator(object):
been finished since the last time this method was called"""
finished_visit_ids = list()
while not self.completion_queue.empty():
finished_visit_ids.append(self.status_queue.get())
finished_visit_ids.append(self.completion_queue.get())
return finished_visit_ids
def launch(self, listener_process_runner, *args):

Просмотреть файл

@ -64,7 +64,6 @@ class LocalListener(BaseListener):
self._ldb_commit_time = 0
self._sql_counter = 0
self._sql_commit_time = 0
self.browser_map = dict() # maps crawl_id to visit_id
super(LocalListener, self).__init__(*base_params)
@ -98,27 +97,7 @@ class LocalListener(BaseListener):
elif table == RECORD_TYPE_CONTENT:
self.process_content(record)
return
# All data records should be keyed by the crawler and site visit
try:
visit_id = data['visit_id']
except KeyError:
self.logger.error("Record for table %s has no visit id" % table)
self.logger.error(json.dumps(data))
return
try:
crawl_id = data['crawl_id']
except KeyError:
self.logger.error("Record for table %s has no crawl id" % table)
self.logger.error(json.dumps(data))
return
# Check if the browser for this record has moved on to a new visit
if crawl_id not in self.browser_map:
self.browser_map[crawl_id] = visit_id
elif self.browser_map[crawl_id] != visit_id:
self.mark_visit_id_done(self.browser_map[crawl_id])
self.browser_map[crawl_id] = visit_id
self.update_records(table, data)
statement, args = self._generate_insert(
table=table, data=data)

Просмотреть файл

@ -278,7 +278,6 @@ class S3Listener(BaseListener):
elif self.browser_map[crawl_id] != visit_id:
self._create_batch(self.browser_map[crawl_id])
self._send_to_s3()
self.mark_visit_id_done(self.browser_map[crawl_id])
self.browser_map[crawl_id] = visit_id
# Convert data to text type

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -347,7 +347,7 @@ class TaskManager:
while True:
visit_id_list = self.data_aggregator.get_saved_visit_ids()
if not visit_id_list:
time.sleep(5)
time.sleep(1)
else:
for visit_id in visit_id_list:
self.unsaved_command_sequences.pop(visit_id).markDone()

Просмотреть файл

@ -2,7 +2,7 @@
from automation import CommandSequence, TaskManager
# The list of sites that we wish to crawl
NUM_BROWSERS = 3
NUM_BROWSERS = 1
sites = ['http://www.example.com',
'http://www.princeton.edu',
'http://citp.princeton.edu/']
@ -40,7 +40,8 @@ for site in sites:
# Parallelize sites over all number of browsers set above.
# (To have all browsers go to the same sites, add `index='**'`)
command_sequence = CommandSequence.CommandSequence(site, reset=True)
command_sequence = CommandSequence.CommandSequence(site, reset=True,
callback=lambda: print("CommandSequence done"))
# Start by visiting the page
command_sequence.get(sleep=3, timeout=60)