Merge pull request #11 from mistercrunch/tweaks

A few tweaks while running core_cx
This commit is contained in:
Maxime Beauchemin 2014-11-17 21:55:45 -08:00
Родитель 8e4c3762c3 cd83e7aa60
Коммит 9ef4fbdaf6
5 изменённых файлов: 26 добавлений и 19 удалений

Просмотреть файл

@ -137,8 +137,9 @@ def master(args):
sq = session.query( sq = session.query(
TI.task_id, TI.task_id,
func.max(TI.execution_date).label('max_ti') func.max(TI.execution_date).label('max_ti')
).filter( ).filter(TI.dag_id == dag.dag_id).group_by(TI.task_id).subquery(
TI.dag_id == dag.dag_id).group_by(TI.task_id).subquery('sq') 'sq')
qry = session.query(TI).filter( qry = session.query(TI).filter(
TI.dag_id == dag.dag_id, TI.dag_id == dag.dag_id,
TI.task_id == sq.c.task_id, TI.task_id == sq.c.task_id,
@ -151,6 +152,7 @@ def master(args):
for task in dag.tasks: for task in dag.tasks:
if task.task_id not in ti_dict: if task.task_id not in ti_dict:
# Brand new task, let's get started # Brand new task, let's get started
print "SD:" + str(task.start_date)
ti = TI(task, task.start_date) ti = TI(task, task.start_date)
executor.queue_command(ti.key, ti.command()) executor.queue_command(ti.key, ti.command())
else: else:
@ -227,7 +229,7 @@ def initdb(args):
if __name__ == '__main__': if __name__ == '__main__':
logging.root.handlers = []
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help='sub-command help') subparsers = parser.add_subparsers(help='sub-command help')

Просмотреть файл

@ -45,7 +45,7 @@ class LocalWorker(multiprocessing.Process):
class LocalExecutor(BaseExecutor): class LocalExecutor(BaseExecutor):
def __init__(self, parallelism=8): def __init__(self, parallelism=4):
super(LocalExecutor, self).__init__() super(LocalExecutor, self).__init__()
self.parallelism = parallelism self.parallelism = parallelism

Просмотреть файл

@ -57,7 +57,6 @@ class HiveHook(BaseHook):
return self.hive return self.hive
def check_for_partition(self, schema, table, partition): def check_for_partition(self, schema, table, partition):
try:
self.hive._oprot.trans.open() self.hive._oprot.trans.open()
partitions = self.hive.get_partitions_by_filter( partitions = self.hive.get_partitions_by_filter(
schema, table, partition, 1) schema, table, partition, 1)
@ -66,9 +65,6 @@ class HiveHook(BaseHook):
return True return True
else: else:
return False return False
except Exception as e:
logging.error(e)
return False
def get_records(self, hql, schema=None): def get_records(self, hql, schema=None):
self.hive._oprot.trans.open() self.hive._oprot.trans.open()

Просмотреть файл

@ -10,6 +10,7 @@ import pickle
import re import re
from time import sleep from time import sleep
import sqlalchemy
from sqlalchemy import ( from sqlalchemy import (
Column, Integer, String, DateTime, Text, Column, Integer, String, DateTime, Text,
ForeignKey, func ForeignKey, func
@ -17,6 +18,7 @@ from sqlalchemy import (
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.serializer import loads, dumps from sqlalchemy.ext.serializer import loads, dumps
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.dialects.mysql import LONGTEXT
from airflow.executors import DEFAULT_EXECUTOR from airflow.executors import DEFAULT_EXECUTOR
from airflow.configuration import getconf from airflow.configuration import getconf
from airflow import settings from airflow import settings
@ -152,7 +154,7 @@ class DagPickle(Base):
the database. the database.
""" """
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
pickle = Column(Text()) pickle = Column(LONGTEXT())
__tablename__ = "dag_pickle" __tablename__ = "dag_pickle"
@ -461,6 +463,7 @@ class TaskInstance(Base):
self.state = State.FAILED self.state = State.FAILED
session.merge(self) session.merge(self)
session.commit() session.commit()
logging.error(str(e))
raise e raise e
self.end_date = datetime.now() self.end_date = datetime.now()
@ -594,7 +597,7 @@ class BackfillJob(BaseJob):
# Triggering what is ready to get triggered # Triggering what is ready to get triggered
while task_instances: while task_instances:
for key, ti in task_instances.items(): for key, ti in task_instances.items():
if ti.state == State.SUCCESS: if ti.state == State.SUCCESS and key in task_instances:
del task_instances[key] del task_instances[key]
elif ti.is_runnable(): elif ti.is_runnable():
executor.queue_command( executor.queue_command(
@ -622,6 +625,7 @@ class BackfillJob(BaseJob):
del task_instances[key] del task_instances[key]
for task_id in downstream: for task_id in downstream:
key = (ti.dag_id, task_id, execution_date) key = (ti.dag_id, task_id, execution_date)
if key in task_instances:
del task_instances[key] del task_instances[key]
elif ti.state == State.SUCCESS: elif ti.state == State.SUCCESS:
del task_instances[key] del task_instances[key]

Просмотреть файл

@ -1,9 +1,13 @@
import os import os
import sys import sys
import sqlalchemy
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine from sqlalchemy import create_engine
from configuration import getconf from configuration import getconf
""" """
if 'AIRFLOW_HOME' not in os.environ: if 'AIRFLOW_HOME' not in os.environ:
os.environ['AIRFLOW_HOME'] = os.path.join(os.path.dirname(__file__), "..") os.environ['AIRFLOW_HOME'] = os.path.join(os.path.dirname(__file__), "..")
@ -17,7 +21,8 @@ if BASE_FOLDER not in sys.path:
Session = sessionmaker() Session = sessionmaker()
#engine = create_engine('mysql://airflow:airflow@localhost/airflow') #engine = create_engine('mysql://airflow:airflow@localhost/airflow')
engine = create_engine(SQL_ALCHEMY_CONN) engine = create_engine(
SQL_ALCHEMY_CONN, pool_size=25)
Session.configure(bind=engine) Session.configure(bind=engine)
# can't move this to configuration due to ConfigParser interpolation # can't move this to configuration due to ConfigParser interpolation