Improvments to command line tools: test, list_dags, list_tasks

This commit is contained in:
Maxime 2015-01-08 21:50:11 +00:00
Родитель 1250b180de
Коммит 7e3049874e
3 изменённых файлов: 59 добавлений и 8 удалений

Просмотреть файл

@ -9,9 +9,6 @@ TODO
* For each existing operator
#### Command line
* `airflow test dag_id task_id YYYY-MM-DD` (outputs log to stdout, doesnt care about dependencies, states)
* `airflow list_dags`
* `airflow list_tasks dag_idj`
* `airflow task_state dag_id task_id YYYY-MM-DD`
#### More Operators!
@ -28,14 +25,14 @@ TODO
#### Backend
* CeleryExecutor
* Master to derive BaseJob
* Clear should kill running jobs
* Mysql port should carry through
* Mysql port should carry through (using default now)
#### Misc
* Write an hypervisor, looks for dead jobs without a heartbeat and kills
* Authentication with Flask-Login and Flask-Principal
* email_on_retry
* Naming for the DatabaseConnection model was shortsighted, the same model can be used for any external connections (FTP, samba, ...), rename the model to Connection.
#### Wishlist
* Support for cron like synthax (0 * * * ) using croniter library

Просмотреть файл

@ -86,6 +86,37 @@ def run(args):
force=args.force,
ignore_dependencies=args.ignore_dependencies)
def list_dags(args):
dagbag = DagBag()
print("\n".join(sorted(dagbag.dags)))
def list_tasks(args):
dagbag = DagBag()
if args.dag_id not in dagbag.dags:
raise Exception('dag_id could not be found')
dag = dagbag.dags[args.dag_id]
if args.tree:
dag.tree_view()
else:
tasks = sorted([t.task_id for t in dag.tasks])
print("\n".join(sorted(tasks)))
def test(args):
log = logging.getLogger()
log.setLevel(logging.DEBUG)
logformat = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logformat)
log.addHandler(ch)
args.execution_date = dateutil.parser.parse(args.execution_date)
dagbag = DagBag(args.subdir)
if args.dag_id not in dagbag.dags:
raise Exception('dag_id could not be found')
dag = dagbag.dags[args.dag_id]
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
ti.run(force=True, ignore_dependencies=True)
def clear(args):
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
@ -281,6 +312,17 @@ if __name__ == '__main__':
help="Serialized pickle object of the entire dag (used internally)")
parser_run.set_defaults(func=run)
ht = "Test a task instance"
parser_test = subparsers.add_parser('test', help=ht)
parser_test.add_argument("dag_id", help="The id of the dag to run")
parser_test.add_argument("task_id", help="The task_id to run")
parser_test.add_argument(
"execution_date", help="The execution date to run")
parser_test.add_argument(
"-sd", "--subdir", help=subdir_help,
default=conf.get('core', 'DAGS_FOLDER'))
parser_test.set_defaults(func=test)
ht = "Start a Airflow webserver instance"
parser_webserver = subparsers.add_parser('webserver', help=ht)
parser_webserver.add_argument(
@ -310,6 +352,18 @@ if __name__ == '__main__':
parser_initdb = subparsers.add_parser('initdb', help=ht)
parser_initdb.set_defaults(func=initdb)
ht = "List the DAGs"
parser_list_dags = subparsers.add_parser('list_dags', help=ht)
parser_list_dags.set_defaults(func=list_dags)
ht = "List the tasks whithin a DAG"
parser_list_tasks = subparsers.add_parser('list_tasks', help=ht)
parser_list_tasks.add_argument(
"-t", "--tree", help="Tree view", action="store_true")
parser_list_tasks.add_argument(
"dag_id", help="The id of the dag")
parser_list_tasks.set_defaults(func=list_tasks)
ht = "Start a Celery worker node"
parser_worker = subparsers.add_parser('worker', help=ht)
parser_worker.set_defaults(func=worker)

Просмотреть файл

@ -1,17 +1,17 @@
import logging
from airflow.settings import conf
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.sequential_executor import SequentialExecutor
_EXECUTOR = conf.get('core', 'EXECUTOR')
if _EXECUTOR == 'LocalExecutor':
from airflow.executors.local_executor import LocalExecutor
DEFAULT_EXECUTOR = LocalExecutor()
elif _EXECUTOR == 'CeleryExecutor':
from airflow.executors.celery_executor import CeleryExecutor
DEFAULT_EXECUTOR = CeleryExecutor()
elif _EXECUTOR == 'SequentialExecutor':
from airflow.executors.sequential_executor import SequentialExecutor
DEFAULT_EXECUTOR = SequentialExecutor()
else:
raise Exception("Executor {0} not supported.".format(_EXECUTOR))