Rename `[scheduler] max_threads` to `[scheduler] parsing_processes` (#12605)
From Airflow 2.0, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`. This is to align the name with the actual code where the Scheduler launches the number of processes defined by `[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG, serialize them and store them in the DB.
This commit is contained in:
Родитель
c457c975b8
Коммит
486134426b
|
@ -52,6 +52,14 @@ assists users migrating to a new version.
|
|||
|
||||
## Master
|
||||
|
||||
### `[scheduler] max_threads` config has been renamed to `[scheduler] parsing_processes`
|
||||
|
||||
From Airflow 2.0, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`.
|
||||
|
||||
This is to align the name with the actual code where the Scheduler launches the number of processes defined by
|
||||
`[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG,
|
||||
serialize them and store them in the DB.
|
||||
|
||||
### Unify user session lifetime configuration
|
||||
|
||||
In previous version of Airflow user session lifetime could be configured by
|
||||
|
|
|
@ -1732,10 +1732,10 @@
|
|||
version_added: 2.0.0
|
||||
type: boolean
|
||||
default: ~
|
||||
- name: max_threads
|
||||
- name: parsing_processes
|
||||
description: |
|
||||
The scheduler can run multiple threads in parallel to schedule dags.
|
||||
This defines how many threads will run.
|
||||
The scheduler can run multiple processes in parallel to parse dags.
|
||||
This defines how many processes will run.
|
||||
version_added: ~
|
||||
type: string
|
||||
example: ~
|
||||
|
|
|
@ -866,9 +866,9 @@ use_row_level_locking = True
|
|||
# Default: True
|
||||
# schedule_after_task_execution =
|
||||
|
||||
# The scheduler can run multiple threads in parallel to schedule dags.
|
||||
# This defines how many threads will run.
|
||||
max_threads = 2
|
||||
# The scheduler can run multiple processes in parallel to parse dags.
|
||||
# This defines how many processes will run.
|
||||
parsing_processes = 2
|
||||
|
||||
# Turn off scheduler use of cron intervals by setting this to False.
|
||||
# DAGs submitted manually in the web UI or with trigger_dag will still run.
|
||||
|
|
|
@ -105,7 +105,7 @@ job_heartbeat_sec = 1
|
|||
schedule_after_task_execution = False
|
||||
scheduler_heartbeat_sec = 5
|
||||
scheduler_health_check_threshold = 30
|
||||
max_threads = 2
|
||||
parsing_processes = 2
|
||||
catchup_by_default = True
|
||||
scheduler_zombie_task_threshold = 300
|
||||
dag_dir_list_interval = 0
|
||||
|
|
|
@ -163,6 +163,7 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors
|
|||
('metrics', 'statsd_datadog_enabled'): ('scheduler', 'statsd_datadog_enabled'),
|
||||
('metrics', 'statsd_datadog_tags'): ('scheduler', 'statsd_datadog_tags'),
|
||||
('metrics', 'statsd_custom_client_path'): ('scheduler', 'statsd_custom_client_path'),
|
||||
('scheduler', 'parsing_processes'): ('scheduler', 'max_threads'),
|
||||
}
|
||||
|
||||
# A mapping of old default values that we want to change and warn the user
|
||||
|
|
|
@ -510,10 +510,10 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
self._async_mode = async_mode
|
||||
self._parsing_start_time: Optional[datetime] = None
|
||||
|
||||
self._parallelism = conf.getint('scheduler', 'max_threads')
|
||||
self._parallelism = conf.getint('scheduler', 'parsing_processes')
|
||||
if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1:
|
||||
self.log.warning(
|
||||
"Because we cannot use more than 1 thread (max_threads = "
|
||||
"Because we cannot use more than 1 thread (parsing_processes = "
|
||||
"%d ) when using sqlite. So we set parallelism to 1.",
|
||||
self._parallelism,
|
||||
)
|
||||
|
|
|
@ -205,8 +205,11 @@ This means ``explicit_defaults_for_timestamp`` is disabled in your mysql server
|
|||
How to reduce airflow dag scheduling latency in production?
|
||||
-----------------------------------------------------------
|
||||
|
||||
- ``max_threads``: Scheduler will spawn multiple threads in parallel to schedule dags. This is controlled by ``max_threads`` with default value of 2. User should increase this value to a larger value (e.g numbers of cpus where scheduler runs - 1) in production.
|
||||
- If you're using Airflow 1.10.x, consider moving to Airflow 2, which has reduced dag scheduling latency dramatically, and allows for running multiple schedulers.
|
||||
- ``parsing_processes``: Scheduler will spawn multiple threads in parallel to parse dags.
|
||||
This is controlled by ``parsing_processes`` with default value of 2.
|
||||
User should increase this value to a larger value (e.g numbers of cpus where scheduler runs + 1) in production.
|
||||
- If you're using Airflow 1.10.x, consider moving to Airflow 2, which has reduced dag scheduling latency dramatically,
|
||||
and allows for running multiple schedulers.
|
||||
|
||||
Why next_ds or prev_ds might not contain expected values?
|
||||
---------------------------------------------------------
|
||||
|
|
|
@ -53,4 +53,4 @@ _test_only_string = this is a test
|
|||
[scheduler]
|
||||
job_heartbeat_sec = 1
|
||||
scheduler_heartbeat_sec = 5
|
||||
max_threads = 2
|
||||
parsing_processes = 2
|
||||
|
|
|
@ -238,7 +238,7 @@ class TestDagFileProcessorManager(unittest.TestCase):
|
|||
file processors until the next zombie detection logic is invoked.
|
||||
"""
|
||||
test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py')
|
||||
with conf_vars({('scheduler', 'max_threads'): '1', ('core', 'load_examples'): 'False'}):
|
||||
with conf_vars({('scheduler', 'parsing_processes'): '1', ('core', 'load_examples'): 'False'}):
|
||||
dagbag = DagBag(test_dag_path, read_dags_from_db=False)
|
||||
with create_session() as session:
|
||||
session.query(LJ).delete()
|
||||
|
|
Загрузка…
Ссылка в новой задаче