[AIRFLOW-3515] Remove the run_duration option (#4320)
This commit is contained in:
Родитель
76d755d777
Коммит
327860fe4f
|
@ -24,6 +24,10 @@ assists users migrating to a new version.
|
|||
|
||||
## Airflow Master
|
||||
|
||||
#### Remove run_duration
|
||||
|
||||
We should not use the `run_duration` option anymore. This used to be for restarting the scheduler from time to time, but right now the scheduler is getting more stable and therefore using this setting is considered bad and might cause an inconsistent state.
|
||||
|
||||
### Modification to config file discovery
|
||||
|
||||
If the `AIRFLOW_CONFIG` environment variable was not set and the
|
||||
|
|
|
@ -970,7 +970,6 @@ def scheduler(args):
|
|||
job = jobs.SchedulerJob(
|
||||
dag_id=args.dag_id,
|
||||
subdir=process_subdir(args.subdir),
|
||||
run_duration=args.run_duration,
|
||||
num_runs=args.num_runs,
|
||||
do_pickle=args.do_pickle)
|
||||
|
||||
|
@ -1768,10 +1767,6 @@ class CLIFactory(object):
|
|||
"stderr."),
|
||||
# scheduler
|
||||
'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
|
||||
'run_duration': Arg(
|
||||
("-r", "--run-duration"),
|
||||
default=None, type=int,
|
||||
help="Set number of seconds to execute before exiting"),
|
||||
'num_runs': Arg(
|
||||
("-n", "--num_runs"),
|
||||
default=-1, type=int,
|
||||
|
@ -2057,7 +2052,7 @@ class CLIFactory(object):
|
|||
}, {
|
||||
'func': scheduler,
|
||||
'help': "Start a scheduler instance",
|
||||
'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
|
||||
'args': ('dag_id_opt', 'subdir', 'num_runs',
|
||||
'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
|
||||
'log_file'),
|
||||
}, {
|
||||
|
|
|
@ -456,10 +456,6 @@ job_heartbeat_sec = 5
|
|||
# how often the scheduler should run (in seconds).
|
||||
scheduler_heartbeat_sec = 5
|
||||
|
||||
# after how much time should the scheduler terminate in seconds
|
||||
# -1 indicates to run continuously (see also num_runs)
|
||||
run_duration = -1
|
||||
|
||||
# after how much time (seconds) a new DAGs should be picked up from the filesystem
|
||||
min_file_process_interval = 0
|
||||
|
||||
|
|
|
@ -545,7 +545,6 @@ class SchedulerJob(BaseJob):
|
|||
subdir=settings.DAGS_FOLDER,
|
||||
num_runs=-1,
|
||||
processor_poll_interval=1.0,
|
||||
run_duration=None,
|
||||
do_pickle=False,
|
||||
log=None,
|
||||
*args, **kwargs):
|
||||
|
@ -563,8 +562,6 @@ class SchedulerJob(BaseJob):
|
|||
:param processor_poll_interval: The number of seconds to wait between
|
||||
polls of running processors
|
||||
:type processor_poll_interval: int
|
||||
:param run_duration: how long to run (in seconds) before exiting
|
||||
:type run_duration: int
|
||||
:param do_pickle: once a DAG object is obtained by executing the Python
|
||||
file, whether to serialize the DAG object to the DB
|
||||
:type do_pickle: bool
|
||||
|
@ -578,7 +575,6 @@ class SchedulerJob(BaseJob):
|
|||
self.subdir = subdir
|
||||
|
||||
self.num_runs = num_runs
|
||||
self.run_duration = run_duration
|
||||
self._processor_poll_interval = processor_poll_interval
|
||||
|
||||
self.do_pickle = do_pickle
|
||||
|
@ -595,10 +591,6 @@ class SchedulerJob(BaseJob):
|
|||
self.using_sqlite = True
|
||||
|
||||
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
|
||||
if run_duration is None:
|
||||
self.run_duration = conf.getint('scheduler',
|
||||
'run_duration')
|
||||
|
||||
self.processor_agent = None
|
||||
self._last_loop = False
|
||||
|
||||
|
@ -1499,7 +1491,6 @@ class SchedulerJob(BaseJob):
|
|||
(executors.LocalExecutor, executors.SequentialExecutor):
|
||||
pickle_dags = True
|
||||
|
||||
self.log.info("Running execute loop for %s seconds", self.run_duration)
|
||||
self.log.info("Processing each file at most %s times", self.num_runs)
|
||||
|
||||
# Build up a list of Python files that could contain DAGs
|
||||
|
@ -1562,8 +1553,7 @@ class SchedulerJob(BaseJob):
|
|||
last_self_heartbeat_time = timezone.utcnow()
|
||||
|
||||
# For the execute duration, parse and schedule DAGs
|
||||
while (timezone.utcnow() - execute_start_time).total_seconds() < \
|
||||
self.run_duration or self.run_duration < 0:
|
||||
while True:
|
||||
self.log.debug("Starting Loop...")
|
||||
loop_start_time = time.time()
|
||||
|
||||
|
|
|
@ -45,10 +45,6 @@ data:
|
|||
# how often the scheduler should run (in seconds).
|
||||
scheduler_heartbeat_sec = 5
|
||||
|
||||
# after how much time should the scheduler terminate in seconds
|
||||
# -1 indicates to run continuously (see also num_runs)
|
||||
run_duration = -1
|
||||
|
||||
# after how much time a new DAGs should be picked up from the filesystem
|
||||
min_file_process_interval = 0
|
||||
|
||||
|
|
|
@ -1988,7 +1988,7 @@ class SchedulerJobTest(unittest.TestCase):
|
|||
session.commit()
|
||||
|
||||
dagbag = self._make_simple_dag_bag([dag1, dag2, dag3])
|
||||
scheduler = SchedulerJob(num_runs=0, run_duration=0)
|
||||
scheduler = SchedulerJob(num_runs=0)
|
||||
scheduler._change_state_for_tis_without_dagrun(
|
||||
simple_dag_bag=dagbag,
|
||||
old_states=[State.SCHEDULED, State.QUEUED],
|
||||
|
@ -2110,7 +2110,7 @@ class SchedulerJobTest(unittest.TestCase):
|
|||
|
||||
processor = mock.MagicMock()
|
||||
|
||||
scheduler = SchedulerJob(num_runs=0, run_duration=0)
|
||||
scheduler = SchedulerJob(num_runs=0)
|
||||
executor = TestExecutor()
|
||||
scheduler.executor = executor
|
||||
scheduler.processor_agent = processor
|
||||
|
@ -3059,30 +3059,6 @@ class SchedulerJobTest(unittest.TestCase):
|
|||
self.assertEqual(ti.try_number, 2)
|
||||
self.assertEqual(ti.state, State.UP_FOR_RETRY)
|
||||
|
||||
def test_scheduler_run_duration(self):
|
||||
"""
|
||||
Verifies that the scheduler run duration limit is followed.
|
||||
"""
|
||||
dag_id = 'test_start_date_scheduling'
|
||||
dag = self.dagbag.get_dag(dag_id)
|
||||
dag.clear()
|
||||
self.assertTrue(dag.start_date > DEFAULT_DATE)
|
||||
|
||||
expected_run_duration = 5
|
||||
start_time = timezone.utcnow()
|
||||
scheduler = SchedulerJob(dag_id,
|
||||
run_duration=expected_run_duration)
|
||||
scheduler.run()
|
||||
end_time = timezone.utcnow()
|
||||
|
||||
run_duration = (end_time - start_time).total_seconds()
|
||||
logging.info("Test ran in %.2fs, expected %.2fs",
|
||||
run_duration,
|
||||
expected_run_duration)
|
||||
# 5s to wait for child process to exit, 1s dummy sleep
|
||||
# in scheduler loop to prevent excessive logs and 1s for last loop to finish.
|
||||
self.assertLess(run_duration - expected_run_duration, 6.0)
|
||||
|
||||
def test_dag_with_system_exit(self):
|
||||
"""
|
||||
Test to check that a DAG with a system.exit() doesn't break the scheduler.
|
||||
|
|
Загрузка…
Ссылка в новой задаче