diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 48e11e31c3..5f03c481cb 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -99,6 +99,17 @@ class Arg: parser.add_argument(*self.flags, **self.kwargs) +def positive_int(value): + """Define a positive int type for an argument.""" + try: + value = int(value) + if value > 0: + return value + except ValueError: + pass + raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'") + + # Shared ARG_DAG_ID = Arg( ("dag_id",), @@ -182,6 +193,13 @@ ARG_LIMIT = Arg( ("--limit",), help="Return a limited number of records") +# next_execution +ARG_NUM_EXECUTIONS = Arg( + ("-n", "--num-executions"), + default=1, + type=positive_int, + help="The number of next execution datetimes to show") + # backfill ARG_MARK_SUCCESS = Arg( ("-m", "--mark-success"), @@ -793,9 +811,10 @@ DAGS_COMMANDS = ( ), ActionCommand( name='next_execution', - help="Get the next execution datetime of a DAG", + help="Get the next execution datetimes of a DAG. It returns one execution unless the " + "num-executions option is given", func=lazy_load_command('airflow.cli.commands.dag_command.dag_next_execution'), - args=(ARG_DAG_ID, ARG_SUBDIR), + args=(ARG_DAG_ID, ARG_SUBDIR, ARG_NUM_EXECUTIONS), ), ActionCommand( name='pause', diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 148d0f51e6..685ca21c78 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -275,7 +275,7 @@ def dag_next_execution(args): dag = get_dag(args.subdir, args.dag_id) if dag.get_is_paused(): - print("[INFO] Please be reminded this DAG is PAUSED now.") + print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr) latest_execution_date = dag.get_latest_execution_date() if latest_execution_date: @@ -283,11 +283,16 @@ def dag_next_execution(args): if next_execution_dttm is None: print("[WARN] No following schedule can be found. " + - "This DAG may have schedule interval '@once' or `None`.") + "This DAG may have schedule interval '@once' or `None`.", file=sys.stderr) + print(None) + else: + print(next_execution_dttm) - print(next_execution_dttm) + for _ in range(1, args.num_executions): + next_execution_dttm = dag.following_schedule(next_execution_dttm) + print(next_execution_dttm) else: - print("[WARN] Only applicable when there is execution record found for the DAG.") + print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr) print(None) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 80b7852fb4..9a5e50fadf 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -20,11 +20,10 @@ import io import os import tempfile import unittest -from datetime import datetime, time, timedelta +from datetime import datetime, timedelta import mock import pytest -import pytz from airflow import settings from airflow.cli import cli_parser @@ -249,10 +248,10 @@ class TestCliDags(unittest.TestCase): dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)) dr.delete(synchronize_session=False) + # Test None output args = self.parser.parse_args(['dags', 'next_execution', dag_ids[0]]) - with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: dag_command.dag_next_execution(args) out = temp_stdout.getvalue() @@ -262,23 +261,16 @@ class TestCliDags(unittest.TestCase): # The details below is determined by the schedule_interval of example DAGs now = DEFAULT_DATE - next_execution_time_for_dag1 = pytz.utc.localize( - datetime.combine( - now.date() + timedelta(days=1), - time(0) - ) - ) - next_execution_time_for_dag2 = now + timedelta(hours=4) - expected_output = [str(next_execution_time_for_dag1), - str(next_execution_time_for_dag2), + expected_output = [str(now + timedelta(days=1)), + str(now + timedelta(hours=4)), "None", "None"] + expected_output_2 = [str(now + timedelta(days=1)) + os.linesep + str(now + timedelta(days=2)), + str(now + timedelta(hours=4)) + os.linesep + str(now + timedelta(hours=8)), + "None", + "None"] for i, dag_id in enumerate(dag_ids): - args = self.parser.parse_args(['dags', - 'next_execution', - dag_id]) - dag = self.dagbag.dags[dag_id] # Create a DagRun for each DAG, to prepare for next step dag.create_dagrun( @@ -288,11 +280,26 @@ class TestCliDags(unittest.TestCase): state=State.FAILED ) + # Test num-executions = 1 (default) + args = self.parser.parse_args(['dags', + 'next_execution', + dag_id]) with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: dag_command.dag_next_execution(args) out = temp_stdout.getvalue() self.assertIn(expected_output[i], out) + # Test num-executions = 2 + args = self.parser.parse_args(['dags', + 'next_execution', + dag_id, + '--num-executions', + '2']) + with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: + dag_command.dag_next_execution(args) + out = temp_stdout.getvalue() + self.assertIn(expected_output_2[i], out) + # Clean up before leaving with create_session() as session: dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)) diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py index 6a7d7140b7..e608859632 100644 --- a/tests/cli/test_cli_parser.py +++ b/tests/cli/test_cli_parser.py @@ -174,3 +174,10 @@ class TestCli(TestCase): for cmd_args in all_command_as_args: with self.assertRaises(SystemExit): parser.parse_args([*cmd_args, '--help']) + + def test_positive_int(self): + self.assertEqual(1, cli_parser.positive_int('1')) + + with self.assertRaises(argparse.ArgumentTypeError): + cli_parser.positive_int('0') + cli_parser.positive_int('-1')