diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 8fb7d7126d..c4bff17985 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -142,6 +142,24 @@ def run(args): executor.queue_command(ti.key, cmd) executor.end() +def task_state(args): + """ + Returns the state of a TaskInstance at the command line. + + >>> airflow task_state tutorial sleep 2015-01-01 + success + """ + log_to_stdout() + args.execution_date = dateutil.parser.parse(args.execution_date) + iso = args.execution_date.isoformat() + dagbag = DagBag(args.subdir) + if args.dag_id not in dagbag.dags: + raise Exception('dag_id could not be found') + dag = dagbag.dags[args.dag_id] + task = dag.get_task(task_id=args.task_id) + ti = TaskInstance(task, args.execution_date) + print ti.current_state() + def list_dags(args): dagbag = DagBag() @@ -319,10 +337,10 @@ def get_parser(): help=mark_success_help, action="store_true") parser_backfill.add_argument( "-l", "--local", - help=mark_success_help, action="store_true") + help="Run the task using the LocalExecutor", action="store_true") parser_backfill.add_argument( "-a", "--include_adhoc", - help=mark_success_help, action="store_true") + help="Include dags with the adhoc parameter.", action="store_true") parser_backfill.add_argument( "-i", "--ignore_dependencies", help=( @@ -409,6 +427,17 @@ def get_parser(): default=DAGS_FOLDER) parser_test.set_defaults(func=test) + ht = "Get the status of a task instance." + parser_task_state = subparsers.add_parser('task_state', help=ht) + parser_task_state.add_argument("dag_id", help="The id of the dag to check") + parser_task_state.add_argument("task_id", help="The task_id to check") + parser_task_state.add_argument( + "execution_date", help="The execution date to check") + parser_task_state.add_argument( + "-sd", "--subdir", help=subdir_help, + default=DAGS_FOLDER) + parser_task_state.set_defaults(func=task_state) + ht = "Start a Airflow webserver instance" parser_webserver = subparsers.add_parser('webserver', help=ht) parser_webserver.add_argument(