diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 07878eb23a..3f05356a7e 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -260,6 +260,12 @@ def worker(args): def initdb(args): + print("DB: " + conf.get('core', 'SQL_ALCHEMY_CONN')) + utils.initdb() + print("Done.") + + +def resetdb(args): print("DB: " + conf.get('core', 'SQL_ALCHEMY_CONN')) if raw_input( "This will drop exisiting tables if they exist. " @@ -415,10 +421,14 @@ def get_parser(): default=conf.get('core', 'DAGS_FOLDER')) parser_master.set_defaults(func=master) - ht = "Initialize and reset the metadata database" + ht = "Initialize the metadata database" parser_initdb = subparsers.add_parser('initdb', help=ht) parser_initdb.set_defaults(func=initdb) + ht = "Burn down and rebuild the metadata database" + parser_resetdb = subparsers.add_parser('resetdb', help=ht) + parser_resetdb.set_defaults(func=resetdb) + ht = "List the DAGs" parser_list_dags = subparsers.add_parser('list_dags', help=ht) parser_list_dags.set_defaults(func=list_dags) diff --git a/airflow/utils.py b/airflow/utils.py index e450304082..25a4e24f4d 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -58,6 +58,44 @@ def pessimistic_connection_handling(): raise exc.DisconnectionError() cursor.close() + +def initdb(): + from airflow import models + logging.info("Creating all tables") + models.Base.metadata.create_all(settings.engine) + + # Creating the local_mysql DB connection + C = models.Connection + session = settings.Session() + + conn = session.query(C).filter(C.conn_id == 'local_mysql').first() + if not conn: + session.add( + models.Connection( + conn_id='local_mysql', conn_type='mysql', + host='localhost', login='airflow', password='airflow', + schema='airflow')) + session.commit() + + conn = session.query(C).filter(C.conn_id == 'presto_default').first() + if not conn: + session.add( + models.Connection( + conn_id='presto_default', conn_type='presto', + host='localhost', + schema='hive', port=10001)) + session.commit() + + conn = session.query(C).filter(C.conn_id == 'hive_default').first() + if not conn: + session.add( + models.Connection( + conn_id='hive_default', conn_type='hive', + host='localhost', + schema='default', port=10000)) + session.commit() + + def resetdb(): ''' Clear out the database @@ -66,37 +104,8 @@ def resetdb(): logging.info("Dropping tables that exist") models.Base.metadata.drop_all(settings.engine) + initdb() - logging.info("Creating all tables") - models.Base.metadata.create_all(settings.engine) - - # Creating the local_mysql DB connection - session = settings.Session() - session.query(models.Connection).delete() - session.add( - models.Connection( - conn_id='local_mysql', conn_type='mysql', - host='localhost', login='airflow', password='airflow', - schema='airflow')) - session.commit() - session.add( - models.Connection( - conn_id='mysql_default', conn_type='mysql', - host='localhost', login='airflow', password='airflow', - schema='airflow')) - session.commit() - session.add( - models.Connection( - conn_id='presto_default', conn_type='presto', - host='localhost', - schema='hive', port=10001)) - session.commit() - session.add( - models.Connection( - conn_id='hive_default', conn_type='hive', - host='localhost', - schema='default', port=10000)) - session.commit() def validate_key(k, max_length=250): if type(k) is not str: diff --git a/run_unit_tests.sh b/run_unit_tests.sh index 94eaf6f214..9c714b1ad4 100755 --- a/run_unit_tests.sh +++ b/run_unit_tests.sh @@ -1,3 +1,3 @@ export AIRFLOW_CONFIG=~/airflow/unittests.cfg nosetests --with-doctest --with-coverage --cover-html --cover-package=airflow -python -m SimpleHTTPServer 8001 +python -m SimpleHTTPServer 8002 diff --git a/tests/core.py b/tests/core.py index e9d4cb799b..d373f22945 100644 --- a/tests/core.py +++ b/tests/core.py @@ -15,7 +15,7 @@ class CoreTest(unittest.TestCase): def setUp(self): configuration.test_mode() - utils.resetdb() + utils.initdb() self.dagbag = models.DagBag( dag_folder=DEV_NULL, include_examples=True) self.dag_bash = self.dagbag.dags['example_bash_operator'] @@ -31,6 +31,10 @@ class CoreTest(unittest.TestCase): job = jobs.LocalTaskJob(task_instance=ti, force=True) job.run() + def test_master_job(self): + job = jobs.MaterJob(dag_id='example_bash_operator') + job.run() + def test_local_backfill_job(self): self.dag_bash.clear( start_date=DEFAULT_DATE,