Add displaying multiple dates in airflow next_execution command (#9072)

The "next_execution" cli sub-command now accepts an optional number of
executions to be returned. This is particularly useful for checking
non-regular schedule intervals, such as those created by some cron
expressions.

Co-authored-by: Kamil Breguła <mik-laj@users.noreply.github.com>
This commit is contained in:
Mauricio De Diana 2020-05-31 16:31:58 -03:00 коммит произвёл GitHub
Родитель 93b8f3e48d
Коммит c002b25e37
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 60 добавлений и 22 удалений

Просмотреть файл

@ -99,6 +99,17 @@ class Arg:
parser.add_argument(*self.flags, **self.kwargs)
def positive_int(value):
"""Define a positive int type for an argument."""
try:
value = int(value)
if value > 0:
return value
except ValueError:
pass
raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'")
# Shared
ARG_DAG_ID = Arg(
("dag_id",),
@ -182,6 +193,13 @@ ARG_LIMIT = Arg(
("--limit",),
help="Return a limited number of records")
# next_execution
ARG_NUM_EXECUTIONS = Arg(
("-n", "--num-executions"),
default=1,
type=positive_int,
help="The number of next execution datetimes to show")
# backfill
ARG_MARK_SUCCESS = Arg(
("-m", "--mark-success"),
@ -793,9 +811,10 @@ DAGS_COMMANDS = (
),
ActionCommand(
name='next_execution',
help="Get the next execution datetime of a DAG",
help="Get the next execution datetimes of a DAG. It returns one execution unless the "
"num-executions option is given",
func=lazy_load_command('airflow.cli.commands.dag_command.dag_next_execution'),
args=(ARG_DAG_ID, ARG_SUBDIR),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_NUM_EXECUTIONS),
),
ActionCommand(
name='pause',

Просмотреть файл

@ -275,7 +275,7 @@ def dag_next_execution(args):
dag = get_dag(args.subdir, args.dag_id)
if dag.get_is_paused():
print("[INFO] Please be reminded this DAG is PAUSED now.")
print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr)
latest_execution_date = dag.get_latest_execution_date()
if latest_execution_date:
@ -283,11 +283,16 @@ def dag_next_execution(args):
if next_execution_dttm is None:
print("[WARN] No following schedule can be found. " +
"This DAG may have schedule interval '@once' or `None`.")
"This DAG may have schedule interval '@once' or `None`.", file=sys.stderr)
print(None)
else:
print(next_execution_dttm)
print(next_execution_dttm)
for _ in range(1, args.num_executions):
next_execution_dttm = dag.following_schedule(next_execution_dttm)
print(next_execution_dttm)
else:
print("[WARN] Only applicable when there is execution record found for the DAG.")
print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr)
print(None)

Просмотреть файл

@ -20,11 +20,10 @@ import io
import os
import tempfile
import unittest
from datetime import datetime, time, timedelta
from datetime import datetime, timedelta
import mock
import pytest
import pytz
from airflow import settings
from airflow.cli import cli_parser
@ -249,10 +248,10 @@ class TestCliDags(unittest.TestCase):
dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
dr.delete(synchronize_session=False)
# Test None output
args = self.parser.parse_args(['dags',
'next_execution',
dag_ids[0]])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_next_execution(args)
out = temp_stdout.getvalue()
@ -262,23 +261,16 @@ class TestCliDags(unittest.TestCase):
# The details below is determined by the schedule_interval of example DAGs
now = DEFAULT_DATE
next_execution_time_for_dag1 = pytz.utc.localize(
datetime.combine(
now.date() + timedelta(days=1),
time(0)
)
)
next_execution_time_for_dag2 = now + timedelta(hours=4)
expected_output = [str(next_execution_time_for_dag1),
str(next_execution_time_for_dag2),
expected_output = [str(now + timedelta(days=1)),
str(now + timedelta(hours=4)),
"None",
"None"]
expected_output_2 = [str(now + timedelta(days=1)) + os.linesep + str(now + timedelta(days=2)),
str(now + timedelta(hours=4)) + os.linesep + str(now + timedelta(hours=8)),
"None",
"None"]
for i, dag_id in enumerate(dag_ids):
args = self.parser.parse_args(['dags',
'next_execution',
dag_id])
dag = self.dagbag.dags[dag_id]
# Create a DagRun for each DAG, to prepare for next step
dag.create_dagrun(
@ -288,11 +280,26 @@ class TestCliDags(unittest.TestCase):
state=State.FAILED
)
# Test num-executions = 1 (default)
args = self.parser.parse_args(['dags',
'next_execution',
dag_id])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_next_execution(args)
out = temp_stdout.getvalue()
self.assertIn(expected_output[i], out)
# Test num-executions = 2
args = self.parser.parse_args(['dags',
'next_execution',
dag_id,
'--num-executions',
'2'])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_next_execution(args)
out = temp_stdout.getvalue()
self.assertIn(expected_output_2[i], out)
# Clean up before leaving
with create_session() as session:
dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))

Просмотреть файл

@ -174,3 +174,10 @@ class TestCli(TestCase):
for cmd_args in all_command_as_args:
with self.assertRaises(SystemExit):
parser.parse_args([*cmd_args, '--help'])
def test_positive_int(self):
self.assertEqual(1, cli_parser.positive_int('1'))
with self.assertRaises(argparse.ArgumentTypeError):
cli_parser.positive_int('0')
cli_parser.positive_int('-1')