diff --git a/UPDATING.md b/UPDATING.md index 5a5fce681e..7114dc99c7 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,6 +24,10 @@ assists users migrating to a new version. ## Airflow Master +#### Remove run_duration + +We should not use the `run_duration` option anymore. This used to be for restarting the scheduler from time to time, but right now the scheduler is getting more stable and therefore using this setting is considered bad and might cause an inconsistent state. + ### Modification to config file discovery If the `AIRFLOW_CONFIG` environment variable was not set and the @@ -33,7 +37,7 @@ will discover its config file using the `$AIRFLOW_CONFIG` and `$AIRFLOW_HOME` environment variables rather than checking for the presence of a file. ### Modification to `ts_nodash` macro -`ts_nodash` previously contained TimeZone information alongwith execution date. For Example: `20150101T000000+0000`. This is not user-friendly for file or folder names which was a popular use case for `ts_nodash`. Hence this behavior has been changed and using `ts_nodash` will no longer contain TimeZone information, restoring the pre-1.10 behavior of this macro. And a new macro `ts_nodash_with_tz` has been added which can be used to get a string with execution date and timezone info without dashes. +`ts_nodash` previously contained TimeZone information alongwith execution date. For Example: `20150101T000000+0000`. This is not user-friendly for file or folder names which was a popular use case for `ts_nodash`. Hence this behavior has been changed and using `ts_nodash` will no longer contain TimeZone information, restoring the pre-1.10 behavior of this macro. And a new macro `ts_nodash_with_tz` has been added which can be used to get a string with execution date and timezone info without dashes. Examples: * `ts_nodash`: `20150101T000000` @@ -206,7 +210,7 @@ There are five roles created for Airflow by default: Admin, User, Op, Viewer, an - All ModelViews in Flask-AppBuilder follow a different pattern from Flask-Admin. The `/admin` part of the URL path will no longer exist. For example: `/admin/connection` becomes `/connection/list`, `/admin/connection/new` becomes `/connection/add`, `/admin/connection/edit` becomes `/connection/edit`, etc. - Due to security concerns, the new webserver will no longer support the features in the `Data Profiling` menu of old UI, including `Ad Hoc Query`, `Charts`, and `Known Events`. - HiveServer2Hook.get_results() always returns a list of tuples, even when a single column is queried, as per Python API 2. -- **UTC is now the default timezone**: Either reconfigure your workflows scheduling in UTC or set `default_timezone` as explained in https://airflow.apache.org/timezone.html#default-time-zone +- **UTC is now the default timezone**: Either reconfigure your workflows scheduling in UTC or set `default_timezone` as explained in https://airflow.apache.org/timezone.html#default-time-zone ### airflow.contrib.sensors.hdfs_sensors renamed to airflow.contrib.sensors.hdfs_sensor diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 877bf34e20..3f3bdc3ad0 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -970,7 +970,6 @@ def scheduler(args): job = jobs.SchedulerJob( dag_id=args.dag_id, subdir=process_subdir(args.subdir), - run_duration=args.run_duration, num_runs=args.num_runs, do_pickle=args.do_pickle) @@ -1768,10 +1767,6 @@ class CLIFactory(object): "stderr."), # scheduler 'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"), - 'run_duration': Arg( - ("-r", "--run-duration"), - default=None, type=int, - help="Set number of seconds to execute before exiting"), 'num_runs': Arg( ("-n", "--num_runs"), default=-1, type=int, @@ -2057,7 +2052,7 @@ class CLIFactory(object): }, { 'func': scheduler, 'help': "Start a scheduler instance", - 'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs', + 'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle', 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), }, { diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index b924c6ecb0..99c08908fc 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -456,10 +456,6 @@ job_heartbeat_sec = 5 # how often the scheduler should run (in seconds). scheduler_heartbeat_sec = 5 -# after how much time should the scheduler terminate in seconds -# -1 indicates to run continuously (see also num_runs) -run_duration = -1 - # after how much time (seconds) a new DAGs should be picked up from the filesystem min_file_process_interval = 0 diff --git a/airflow/jobs.py b/airflow/jobs.py index f71fa3cd63..8771405c48 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -545,7 +545,6 @@ class SchedulerJob(BaseJob): subdir=settings.DAGS_FOLDER, num_runs=-1, processor_poll_interval=1.0, - run_duration=None, do_pickle=False, log=None, *args, **kwargs): @@ -563,8 +562,6 @@ class SchedulerJob(BaseJob): :param processor_poll_interval: The number of seconds to wait between polls of running processors :type processor_poll_interval: int - :param run_duration: how long to run (in seconds) before exiting - :type run_duration: int :param do_pickle: once a DAG object is obtained by executing the Python file, whether to serialize the DAG object to the DB :type do_pickle: bool @@ -578,7 +575,6 @@ class SchedulerJob(BaseJob): self.subdir = subdir self.num_runs = num_runs - self.run_duration = run_duration self._processor_poll_interval = processor_poll_interval self.do_pickle = do_pickle @@ -595,10 +591,6 @@ class SchedulerJob(BaseJob): self.using_sqlite = True self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query') - if run_duration is None: - self.run_duration = conf.getint('scheduler', - 'run_duration') - self.processor_agent = None self._last_loop = False @@ -1499,7 +1491,6 @@ class SchedulerJob(BaseJob): (executors.LocalExecutor, executors.SequentialExecutor): pickle_dags = True - self.log.info("Running execute loop for %s seconds", self.run_duration) self.log.info("Processing each file at most %s times", self.num_runs) # Build up a list of Python files that could contain DAGs @@ -1562,8 +1553,7 @@ class SchedulerJob(BaseJob): last_self_heartbeat_time = timezone.utcnow() # For the execute duration, parse and schedule DAGs - while (timezone.utcnow() - execute_start_time).total_seconds() < \ - self.run_duration or self.run_duration < 0: + while True: self.log.debug("Starting Loop...") loop_start_time = time.time() diff --git a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml index 7761a6bcdc..4137140709 100644 --- a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml +++ b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml @@ -45,10 +45,6 @@ data: # how often the scheduler should run (in seconds). scheduler_heartbeat_sec = 5 - # after how much time should the scheduler terminate in seconds - # -1 indicates to run continuously (see also num_runs) - run_duration = -1 - # after how much time a new DAGs should be picked up from the filesystem min_file_process_interval = 0 diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 75deba44e9..9094a580b5 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1988,7 +1988,7 @@ class SchedulerJobTest(unittest.TestCase): session.commit() dagbag = self._make_simple_dag_bag([dag1, dag2, dag3]) - scheduler = SchedulerJob(num_runs=0, run_duration=0) + scheduler = SchedulerJob(num_runs=0) scheduler._change_state_for_tis_without_dagrun( simple_dag_bag=dagbag, old_states=[State.SCHEDULED, State.QUEUED], @@ -2110,7 +2110,7 @@ class SchedulerJobTest(unittest.TestCase): processor = mock.MagicMock() - scheduler = SchedulerJob(num_runs=0, run_duration=0) + scheduler = SchedulerJob(num_runs=0) executor = TestExecutor() scheduler.executor = executor scheduler.processor_agent = processor @@ -3059,30 +3059,6 @@ class SchedulerJobTest(unittest.TestCase): self.assertEqual(ti.try_number, 2) self.assertEqual(ti.state, State.UP_FOR_RETRY) - def test_scheduler_run_duration(self): - """ - Verifies that the scheduler run duration limit is followed. - """ - dag_id = 'test_start_date_scheduling' - dag = self.dagbag.get_dag(dag_id) - dag.clear() - self.assertTrue(dag.start_date > DEFAULT_DATE) - - expected_run_duration = 5 - start_time = timezone.utcnow() - scheduler = SchedulerJob(dag_id, - run_duration=expected_run_duration) - scheduler.run() - end_time = timezone.utcnow() - - run_duration = (end_time - start_time).total_seconds() - logging.info("Test ran in %.2fs, expected %.2fs", - run_duration, - expected_run_duration) - # 5s to wait for child process to exit, 1s dummy sleep - # in scheduler loop to prevent excessive logs and 1s for last loop to finish. - self.assertLess(run_duration - expected_run_duration, 6.0) - def test_dag_with_system_exit(self): """ Test to check that a DAG with a system.exit() doesn't break the scheduler.