Merge pull request #211 from mistercrunch/cli_task_state

add task_state command and correct help messages for misc options
This commit is contained in:
Arthur Wiedmer 2015-05-12 08:51:03 -07:00
Родитель b9fc46ca5c 07b3ff9210
Коммит 6fc5c86a79
1 изменённых файлов: 31 добавлений и 2 удалений

Просмотреть файл

@ -142,6 +142,24 @@ def run(args):
executor.queue_command(ti.key, cmd) executor.queue_command(ti.key, cmd)
executor.end() executor.end()
def task_state(args):
"""
Returns the state of a TaskInstance at the command line.
>>> airflow task_state tutorial sleep 2015-01-01
success
"""
log_to_stdout()
args.execution_date = dateutil.parser.parse(args.execution_date)
iso = args.execution_date.isoformat()
dagbag = DagBag(args.subdir)
if args.dag_id not in dagbag.dags:
raise Exception('dag_id could not be found')
dag = dagbag.dags[args.dag_id]
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
print ti.current_state()
def list_dags(args): def list_dags(args):
dagbag = DagBag() dagbag = DagBag()
@ -319,10 +337,10 @@ def get_parser():
help=mark_success_help, action="store_true") help=mark_success_help, action="store_true")
parser_backfill.add_argument( parser_backfill.add_argument(
"-l", "--local", "-l", "--local",
help=mark_success_help, action="store_true") help="Run the task using the LocalExecutor", action="store_true")
parser_backfill.add_argument( parser_backfill.add_argument(
"-a", "--include_adhoc", "-a", "--include_adhoc",
help=mark_success_help, action="store_true") help="Include dags with the adhoc parameter.", action="store_true")
parser_backfill.add_argument( parser_backfill.add_argument(
"-i", "--ignore_dependencies", "-i", "--ignore_dependencies",
help=( help=(
@ -409,6 +427,17 @@ def get_parser():
default=DAGS_FOLDER) default=DAGS_FOLDER)
parser_test.set_defaults(func=test) parser_test.set_defaults(func=test)
ht = "Get the status of a task instance."
parser_task_state = subparsers.add_parser('task_state', help=ht)
parser_task_state.add_argument("dag_id", help="The id of the dag to check")
parser_task_state.add_argument("task_id", help="The task_id to check")
parser_task_state.add_argument(
"execution_date", help="The execution date to check")
parser_task_state.add_argument(
"-sd", "--subdir", help=subdir_help,
default=DAGS_FOLDER)
parser_task_state.set_defaults(func=task_state)
ht = "Start a Airflow webserver instance" ht = "Start a Airflow webserver instance"
parser_webserver = subparsers.add_parser('webserver', help=ht) parser_webserver = subparsers.add_parser('webserver', help=ht)
parser_webserver.add_argument( parser_webserver.add_argument(