From 7e3049874eb225e345aba551d77fef09fb562ca9 Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 8 Jan 2015 21:50:11 +0000 Subject: [PATCH] Improvments to command line tools: test, list_dags, list_tasks --- TODO.md | 7 ++--- airflow/bin/airflow | 54 +++++++++++++++++++++++++++++++++++ airflow/executors/__init__.py | 6 ++-- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/TODO.md b/TODO.md index d3348a58b3..a0399f37a8 100644 --- a/TODO.md +++ b/TODO.md @@ -9,9 +9,6 @@ TODO * For each existing operator #### Command line -* `airflow test dag_id task_id YYYY-MM-DD` (outputs log to stdout, doesnt care about dependencies, states) -* `airflow list_dags` -* `airflow list_tasks dag_idj` * `airflow task_state dag_id task_id YYYY-MM-DD` #### More Operators! @@ -28,14 +25,14 @@ TODO #### Backend * CeleryExecutor -* Master to derive BaseJob * Clear should kill running jobs -* Mysql port should carry through +* Mysql port should carry through (using default now) #### Misc * Write an hypervisor, looks for dead jobs without a heartbeat and kills * Authentication with Flask-Login and Flask-Principal * email_on_retry +* Naming for the DatabaseConnection model was shortsighted, the same model can be used for any external connections (FTP, samba, ...), rename the model to Connection. #### Wishlist * Support for cron like synthax (0 * * * ) using croniter library diff --git a/airflow/bin/airflow b/airflow/bin/airflow index e288f1ec5e..b13353706c 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -86,6 +86,37 @@ def run(args): force=args.force, ignore_dependencies=args.ignore_dependencies) +def list_dags(args): + dagbag = DagBag() + print("\n".join(sorted(dagbag.dags))) + +def list_tasks(args): + dagbag = DagBag() + if args.dag_id not in dagbag.dags: + raise Exception('dag_id could not be found') + dag = dagbag.dags[args.dag_id] + if args.tree: + dag.tree_view() + else: + tasks = sorted([t.task_id for t in dag.tasks]) + print("\n".join(sorted(tasks))) + +def test(args): + log = logging.getLogger() + log.setLevel(logging.DEBUG) + logformat = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ch = logging.StreamHandler(sys.stdout) + ch.setFormatter(logformat) + log.addHandler(ch) + args.execution_date = dateutil.parser.parse(args.execution_date) + dagbag = DagBag(args.subdir) + if args.dag_id not in dagbag.dags: + raise Exception('dag_id could not be found') + dag = dagbag.dags[args.dag_id] + task = dag.get_task(task_id=args.task_id) + ti = TaskInstance(task, args.execution_date) + ti.run(force=True, ignore_dependencies=True) def clear(args): logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT) @@ -281,6 +312,17 @@ if __name__ == '__main__': help="Serialized pickle object of the entire dag (used internally)") parser_run.set_defaults(func=run) + ht = "Test a task instance" + parser_test = subparsers.add_parser('test', help=ht) + parser_test.add_argument("dag_id", help="The id of the dag to run") + parser_test.add_argument("task_id", help="The task_id to run") + parser_test.add_argument( + "execution_date", help="The execution date to run") + parser_test.add_argument( + "-sd", "--subdir", help=subdir_help, + default=conf.get('core', 'DAGS_FOLDER')) + parser_test.set_defaults(func=test) + ht = "Start a Airflow webserver instance" parser_webserver = subparsers.add_parser('webserver', help=ht) parser_webserver.add_argument( @@ -310,6 +352,18 @@ if __name__ == '__main__': parser_initdb = subparsers.add_parser('initdb', help=ht) parser_initdb.set_defaults(func=initdb) + ht = "List the DAGs" + parser_list_dags = subparsers.add_parser('list_dags', help=ht) + parser_list_dags.set_defaults(func=list_dags) + + ht = "List the tasks whithin a DAG" + parser_list_tasks = subparsers.add_parser('list_tasks', help=ht) + parser_list_tasks.add_argument( + "-t", "--tree", help="Tree view", action="store_true") + parser_list_tasks.add_argument( + "dag_id", help="The id of the dag") + parser_list_tasks.set_defaults(func=list_tasks) + ht = "Start a Celery worker node" parser_worker = subparsers.add_parser('worker', help=ht) parser_worker.set_defaults(func=worker) diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index 922c4d7639..8fdee8a935 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -1,17 +1,17 @@ import logging from airflow.settings import conf +from airflow.executors.local_executor import LocalExecutor +from airflow.executors.celery_executor import CeleryExecutor +from airflow.executors.sequential_executor import SequentialExecutor _EXECUTOR = conf.get('core', 'EXECUTOR') if _EXECUTOR == 'LocalExecutor': - from airflow.executors.local_executor import LocalExecutor DEFAULT_EXECUTOR = LocalExecutor() elif _EXECUTOR == 'CeleryExecutor': - from airflow.executors.celery_executor import CeleryExecutor DEFAULT_EXECUTOR = CeleryExecutor() elif _EXECUTOR == 'SequentialExecutor': - from airflow.executors.sequential_executor import SequentialExecutor DEFAULT_EXECUTOR = SequentialExecutor() else: raise Exception("Executor {0} not supported.".format(_EXECUTOR))