Merge pull request #73 from mistercrunch/cmdline
Improvments to command line tools: test, list_dags, list_tasks
This commit is contained in:
Коммит
1789dd088c
7
TODO.md
7
TODO.md
|
@ -9,9 +9,6 @@ TODO
|
|||
* For each existing operator
|
||||
|
||||
#### Command line
|
||||
* `airflow test dag_id task_id YYYY-MM-DD` (outputs log to stdout, doesnt care about dependencies, states)
|
||||
* `airflow list_dags`
|
||||
* `airflow list_tasks dag_idj`
|
||||
* `airflow task_state dag_id task_id YYYY-MM-DD`
|
||||
|
||||
#### More Operators!
|
||||
|
@ -28,14 +25,14 @@ TODO
|
|||
|
||||
#### Backend
|
||||
* CeleryExecutor
|
||||
* Master to derive BaseJob
|
||||
* Clear should kill running jobs
|
||||
* Mysql port should carry through
|
||||
* Mysql port should carry through (using default now)
|
||||
|
||||
#### Misc
|
||||
* Write an hypervisor, looks for dead jobs without a heartbeat and kills
|
||||
* Authentication with Flask-Login and Flask-Principal
|
||||
* email_on_retry
|
||||
* Naming for the DatabaseConnection model was shortsighted, the same model can be used for any external connections (FTP, samba, ...), rename the model to Connection.
|
||||
|
||||
#### Wishlist
|
||||
* Support for cron like synthax (0 * * * ) using croniter library
|
||||
|
|
|
@ -86,6 +86,37 @@ def run(args):
|
|||
force=args.force,
|
||||
ignore_dependencies=args.ignore_dependencies)
|
||||
|
||||
def list_dags(args):
|
||||
dagbag = DagBag()
|
||||
print("\n".join(sorted(dagbag.dags)))
|
||||
|
||||
def list_tasks(args):
|
||||
dagbag = DagBag()
|
||||
if args.dag_id not in dagbag.dags:
|
||||
raise Exception('dag_id could not be found')
|
||||
dag = dagbag.dags[args.dag_id]
|
||||
if args.tree:
|
||||
dag.tree_view()
|
||||
else:
|
||||
tasks = sorted([t.task_id for t in dag.tasks])
|
||||
print("\n".join(sorted(tasks)))
|
||||
|
||||
def test(args):
|
||||
log = logging.getLogger()
|
||||
log.setLevel(logging.DEBUG)
|
||||
logformat = logging.Formatter(
|
||||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
ch = logging.StreamHandler(sys.stdout)
|
||||
ch.setFormatter(logformat)
|
||||
log.addHandler(ch)
|
||||
args.execution_date = dateutil.parser.parse(args.execution_date)
|
||||
dagbag = DagBag(args.subdir)
|
||||
if args.dag_id not in dagbag.dags:
|
||||
raise Exception('dag_id could not be found')
|
||||
dag = dagbag.dags[args.dag_id]
|
||||
task = dag.get_task(task_id=args.task_id)
|
||||
ti = TaskInstance(task, args.execution_date)
|
||||
ti.run(force=True, ignore_dependencies=True)
|
||||
|
||||
def clear(args):
|
||||
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
|
||||
|
@ -281,6 +312,17 @@ if __name__ == '__main__':
|
|||
help="Serialized pickle object of the entire dag (used internally)")
|
||||
parser_run.set_defaults(func=run)
|
||||
|
||||
ht = "Test a task instance"
|
||||
parser_test = subparsers.add_parser('test', help=ht)
|
||||
parser_test.add_argument("dag_id", help="The id of the dag to run")
|
||||
parser_test.add_argument("task_id", help="The task_id to run")
|
||||
parser_test.add_argument(
|
||||
"execution_date", help="The execution date to run")
|
||||
parser_test.add_argument(
|
||||
"-sd", "--subdir", help=subdir_help,
|
||||
default=conf.get('core', 'DAGS_FOLDER'))
|
||||
parser_test.set_defaults(func=test)
|
||||
|
||||
ht = "Start a Airflow webserver instance"
|
||||
parser_webserver = subparsers.add_parser('webserver', help=ht)
|
||||
parser_webserver.add_argument(
|
||||
|
@ -310,6 +352,18 @@ if __name__ == '__main__':
|
|||
parser_initdb = subparsers.add_parser('initdb', help=ht)
|
||||
parser_initdb.set_defaults(func=initdb)
|
||||
|
||||
ht = "List the DAGs"
|
||||
parser_list_dags = subparsers.add_parser('list_dags', help=ht)
|
||||
parser_list_dags.set_defaults(func=list_dags)
|
||||
|
||||
ht = "List the tasks whithin a DAG"
|
||||
parser_list_tasks = subparsers.add_parser('list_tasks', help=ht)
|
||||
parser_list_tasks.add_argument(
|
||||
"-t", "--tree", help="Tree view", action="store_true")
|
||||
parser_list_tasks.add_argument(
|
||||
"dag_id", help="The id of the dag")
|
||||
parser_list_tasks.set_defaults(func=list_tasks)
|
||||
|
||||
ht = "Start a Celery worker node"
|
||||
parser_worker = subparsers.add_parser('worker', help=ht)
|
||||
parser_worker.set_defaults(func=worker)
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
import logging
|
||||
|
||||
from airflow.settings import conf
|
||||
from airflow.executors.local_executor import LocalExecutor
|
||||
from airflow.executors.celery_executor import CeleryExecutor
|
||||
from airflow.executors.sequential_executor import SequentialExecutor
|
||||
|
||||
_EXECUTOR = conf.get('core', 'EXECUTOR')
|
||||
|
||||
if _EXECUTOR == 'LocalExecutor':
|
||||
from airflow.executors.local_executor import LocalExecutor
|
||||
DEFAULT_EXECUTOR = LocalExecutor()
|
||||
elif _EXECUTOR == 'CeleryExecutor':
|
||||
from airflow.executors.celery_executor import CeleryExecutor
|
||||
DEFAULT_EXECUTOR = CeleryExecutor()
|
||||
elif _EXECUTOR == 'SequentialExecutor':
|
||||
from airflow.executors.sequential_executor import SequentialExecutor
|
||||
DEFAULT_EXECUTOR = SequentialExecutor()
|
||||
else:
|
||||
raise Exception("Executor {0} not supported.".format(_EXECUTOR))
|
||||
|
|
Загрузка…
Ссылка в новой задаче