Merge pull request #37 from mistercrunch/pessimistic

Pessimistic pool connection handling for master and run
This commit is contained in:
Maxime Beauchemin 2014-11-30 15:05:05 -08:00
Родитель d9e26d1d77 6f91a67ada
Коммит affa5470cd
3 изменённых файлов: 22 добавлений и 2 удалений

Просмотреть файл

@ -43,6 +43,8 @@ def backfill(args):
def run(args):
settings.pessimistic_connection_handling()
# Setting up logging
directory = getconf().get('core', 'BASE_LOG_FOLDER') + \
"/{args.dag_id}/{args.task_id}".format(args=args)
@ -145,6 +147,7 @@ def webserver(args):
def master(args):
settings.pessimistic_connection_handling()
# Sleep time (seconds) between master runs
MASTER_SLEEP_INTERVAL = 60

Просмотреть файл

@ -2,10 +2,12 @@ import sys
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy.pool import Pool
from airflow.configuration import getconf
HEADER = """\
.__ _____.__
_____ |__|_______/ ____\ | ______ _ __
@ -14,6 +16,21 @@ _____ |__|_______/ ____\ | ______ _ __
(____ /__||__| |__| |____/\____/ \/\_/
\/"""
def pessimistic_connection_handling():
@event.listens_for(Pool, "checkout")
def ping_connection(dbapi_connection, connection_record, connection_proxy):
'''
Disconnect Handling - Pessimistic, taken from:
http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
'''
cursor = dbapi_connection.cursor()
try:
cursor.execute("SELECT 1")
except:
raise exc.DisconnectionError()
cursor.close()
BASE_FOLDER = getconf().get('core', 'BASE_FOLDER')
SQL_ALCHEMY_CONN = getconf().get('core', 'SQL_ALCHEMY_CONN')
if BASE_FOLDER not in sys.path:

Просмотреть файл

@ -28,7 +28,7 @@ for i in range(9):
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='sleep {{ 10 + macros.random() * 10 }}',
bash_command='sleep 20',
**default_args)
task.set_downstream(run_this)
dag.add_task(task)