Breaking downt initdb into initdb and resetdb
This commit is contained in:
Родитель
bdb205aebc
Коммит
836772ffdb
|
@ -260,6 +260,12 @@ def worker(args):
|
|||
|
||||
|
||||
def initdb(args):
|
||||
print("DB: " + conf.get('core', 'SQL_ALCHEMY_CONN'))
|
||||
utils.initdb()
|
||||
print("Done.")
|
||||
|
||||
|
||||
def resetdb(args):
|
||||
print("DB: " + conf.get('core', 'SQL_ALCHEMY_CONN'))
|
||||
if raw_input(
|
||||
"This will drop exisiting tables if they exist. "
|
||||
|
@ -415,10 +421,14 @@ def get_parser():
|
|||
default=conf.get('core', 'DAGS_FOLDER'))
|
||||
parser_master.set_defaults(func=master)
|
||||
|
||||
ht = "Initialize and reset the metadata database"
|
||||
ht = "Initialize the metadata database"
|
||||
parser_initdb = subparsers.add_parser('initdb', help=ht)
|
||||
parser_initdb.set_defaults(func=initdb)
|
||||
|
||||
ht = "Burn down and rebuild the metadata database"
|
||||
parser_resetdb = subparsers.add_parser('resetdb', help=ht)
|
||||
parser_resetdb.set_defaults(func=resetdb)
|
||||
|
||||
ht = "List the DAGs"
|
||||
parser_list_dags = subparsers.add_parser('list_dags', help=ht)
|
||||
parser_list_dags.set_defaults(func=list_dags)
|
||||
|
|
|
@ -58,6 +58,44 @@ def pessimistic_connection_handling():
|
|||
raise exc.DisconnectionError()
|
||||
cursor.close()
|
||||
|
||||
|
||||
def initdb():
|
||||
from airflow import models
|
||||
logging.info("Creating all tables")
|
||||
models.Base.metadata.create_all(settings.engine)
|
||||
|
||||
# Creating the local_mysql DB connection
|
||||
C = models.Connection
|
||||
session = settings.Session()
|
||||
|
||||
conn = session.query(C).filter(C.conn_id == 'local_mysql').first()
|
||||
if not conn:
|
||||
session.add(
|
||||
models.Connection(
|
||||
conn_id='local_mysql', conn_type='mysql',
|
||||
host='localhost', login='airflow', password='airflow',
|
||||
schema='airflow'))
|
||||
session.commit()
|
||||
|
||||
conn = session.query(C).filter(C.conn_id == 'presto_default').first()
|
||||
if not conn:
|
||||
session.add(
|
||||
models.Connection(
|
||||
conn_id='presto_default', conn_type='presto',
|
||||
host='localhost',
|
||||
schema='hive', port=10001))
|
||||
session.commit()
|
||||
|
||||
conn = session.query(C).filter(C.conn_id == 'hive_default').first()
|
||||
if not conn:
|
||||
session.add(
|
||||
models.Connection(
|
||||
conn_id='hive_default', conn_type='hive',
|
||||
host='localhost',
|
||||
schema='default', port=10000))
|
||||
session.commit()
|
||||
|
||||
|
||||
def resetdb():
|
||||
'''
|
||||
Clear out the database
|
||||
|
@ -66,37 +104,8 @@ def resetdb():
|
|||
|
||||
logging.info("Dropping tables that exist")
|
||||
models.Base.metadata.drop_all(settings.engine)
|
||||
initdb()
|
||||
|
||||
logging.info("Creating all tables")
|
||||
models.Base.metadata.create_all(settings.engine)
|
||||
|
||||
# Creating the local_mysql DB connection
|
||||
session = settings.Session()
|
||||
session.query(models.Connection).delete()
|
||||
session.add(
|
||||
models.Connection(
|
||||
conn_id='local_mysql', conn_type='mysql',
|
||||
host='localhost', login='airflow', password='airflow',
|
||||
schema='airflow'))
|
||||
session.commit()
|
||||
session.add(
|
||||
models.Connection(
|
||||
conn_id='mysql_default', conn_type='mysql',
|
||||
host='localhost', login='airflow', password='airflow',
|
||||
schema='airflow'))
|
||||
session.commit()
|
||||
session.add(
|
||||
models.Connection(
|
||||
conn_id='presto_default', conn_type='presto',
|
||||
host='localhost',
|
||||
schema='hive', port=10001))
|
||||
session.commit()
|
||||
session.add(
|
||||
models.Connection(
|
||||
conn_id='hive_default', conn_type='hive',
|
||||
host='localhost',
|
||||
schema='default', port=10000))
|
||||
session.commit()
|
||||
|
||||
def validate_key(k, max_length=250):
|
||||
if type(k) is not str:
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
export AIRFLOW_CONFIG=~/airflow/unittests.cfg
|
||||
nosetests --with-doctest --with-coverage --cover-html --cover-package=airflow
|
||||
python -m SimpleHTTPServer 8001
|
||||
python -m SimpleHTTPServer 8002
|
||||
|
|
|
@ -15,7 +15,7 @@ class CoreTest(unittest.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
configuration.test_mode()
|
||||
utils.resetdb()
|
||||
utils.initdb()
|
||||
self.dagbag = models.DagBag(
|
||||
dag_folder=DEV_NULL, include_examples=True)
|
||||
self.dag_bash = self.dagbag.dags['example_bash_operator']
|
||||
|
@ -31,6 +31,10 @@ class CoreTest(unittest.TestCase):
|
|||
job = jobs.LocalTaskJob(task_instance=ti, force=True)
|
||||
job.run()
|
||||
|
||||
def test_master_job(self):
|
||||
job = jobs.MaterJob(dag_id='example_bash_operator')
|
||||
job.run()
|
||||
|
||||
def test_local_backfill_job(self):
|
||||
self.dag_bash.clear(
|
||||
start_date=DEFAULT_DATE,
|
||||
|
|
Загрузка…
Ссылка в новой задаче