Adding a timeout Context object and using it when importing dags

This commit is contained in:
Maxime 2015-06-18 05:04:10 +00:00
Родитель 990956781f
Коммит d402831818
2 изменённых файлов: 24 добавлений и 1 удалений

Просмотреть файл

@ -156,7 +156,8 @@ class DagBag(object):
logging.info("Importing " + filepath)
if mod_name in sys.modules:
del sys.modules[mod_name]
m = imp.load_source(mod_name, filepath)
with utils.timeout(30):
m = imp.load_source(mod_name, filepath)
except:
logging.error("Failed to import: " + filepath)
logging.exception("")

Просмотреть файл

@ -10,6 +10,7 @@ import logging
import os
import re
import shutil
import signal
import smtplib
from tempfile import mkdtemp
@ -408,3 +409,24 @@ def TemporaryDirectory(suffix='', prefix=None, dir=None):
# ENOENT - no such file or directory
if e.errno != errno.ENOENT:
raise e
class TimeoutError(Exception):
pass
class timeout:
"""
To be used in a ``with`` block and timeout its content.
"""
def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self, signum, frame):
logging.error("Process timed out")
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)