Rename DagBag.store_serialized_dags to Dagbag.read_dags_from_db (#9838)
This commit is contained in:
Родитель
b01d95ec22
Коммит
d008ff669d
34
UPDATING.md
34
UPDATING.md
|
@ -1442,6 +1442,40 @@ Now the `dag_id` will not appear repeated in the payload, and the response forma
|
|||
}
|
||||
```
|
||||
|
||||
### Change in DagBag signature
|
||||
|
||||
Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property
|
||||
are deprecated and will be removed in future versions.
|
||||
|
||||
|
||||
**Previous signature**:
|
||||
|
||||
```python
|
||||
DagBag(
|
||||
dag_folder=None,
|
||||
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
|
||||
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
|
||||
store_serialized_dags=False
|
||||
):
|
||||
```
|
||||
|
||||
**current**:
|
||||
```python
|
||||
DagBag(
|
||||
dag_folder=None,
|
||||
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
|
||||
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
|
||||
read_dags_from_db=False
|
||||
):
|
||||
```
|
||||
|
||||
If you were using positional arguments, it requires no change but if you were using keyword
|
||||
arguments, please change `store_serialized_dags` to `read_dags_from_db`.
|
||||
|
||||
Similarly, if you were using `DagBag().store_serialized_dags` property, change it to
|
||||
`DagBag().read_dags_from_db`.
|
||||
|
||||
|
||||
## Airflow 1.10.11
|
||||
|
||||
### Use NULL as default value for dag.description
|
||||
|
|
|
@ -32,7 +32,7 @@ def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel:
|
|||
|
||||
dagbag = DagBag(
|
||||
dag_folder=dag_model.fileloc,
|
||||
store_serialized_dags=conf.getboolean('core', 'store_serialized_dags')
|
||||
read_dags_from_db=conf.getboolean('core', 'store_serialized_dags')
|
||||
)
|
||||
dag = dagbag.get_dag(dag_id) # prefetch dag if it is stored serialized
|
||||
if dag_id not in dagbag.dags:
|
||||
|
|
|
@ -120,7 +120,7 @@ def trigger_dag(
|
|||
return conf.getboolean('core', 'store_serialized_dags')
|
||||
dagbag = DagBag(
|
||||
dag_folder=dag_model.fileloc,
|
||||
store_serialized_dags=read_store_serialized_dags()
|
||||
read_dags_from_db=read_store_serialized_dags()
|
||||
)
|
||||
dag_run = DagRun()
|
||||
triggers = _trigger_dag(
|
||||
|
|
|
@ -23,6 +23,7 @@ import importlib.util
|
|||
import os
|
||||
import sys
|
||||
import textwrap
|
||||
import warnings
|
||||
import zipfile
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, NamedTuple, Optional
|
||||
|
@ -69,26 +70,35 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
:param include_examples: whether to include the examples that ship
|
||||
with airflow or not
|
||||
:type include_examples: bool
|
||||
:param store_serialized_dags: Read DAGs from DB if store_serialized_dags is ``True``.
|
||||
:param read_dags_from_db: Read DAGs from DB if store_serialized_dags is ``True``.
|
||||
If ``False`` DAGs are read from python files. This property is not used when
|
||||
determining whether or not to write Serialized DAGs, that is done by checking
|
||||
the config ``store_serialized_dags``.
|
||||
:type store_serialized_dags: bool
|
||||
:type read_dags_from_db: bool
|
||||
"""
|
||||
|
||||
DAGBAG_IMPORT_TIMEOUT = conf.getint('core', 'DAGBAG_IMPORT_TIMEOUT')
|
||||
SCHEDULER_ZOMBIE_TASK_THRESHOLD = conf.getint('scheduler', 'scheduler_zombie_task_threshold')
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
dag_folder: Optional[str] = None,
|
||||
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
|
||||
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
|
||||
store_serialized_dags: bool = False,
|
||||
self,
|
||||
dag_folder: Optional[str] = None,
|
||||
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
|
||||
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
|
||||
read_dags_from_db: bool = False,
|
||||
store_serialized_dags: Optional[bool] = None,
|
||||
):
|
||||
# Avoid circular import
|
||||
from airflow.models.dag import DAG
|
||||
super().__init__()
|
||||
|
||||
if store_serialized_dags:
|
||||
warnings.warn(
|
||||
"The store_serialized_dags parameter has been deprecated. "
|
||||
"You should pass the read_dags_from_db parameter.",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
read_dags_from_db = store_serialized_dags
|
||||
|
||||
dag_folder = dag_folder or settings.DAGS_FOLDER
|
||||
self.dag_folder = dag_folder
|
||||
self.dags: Dict[str, DAG] = {}
|
||||
|
@ -96,7 +106,7 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
self.file_last_changed: Dict[str, datetime] = {}
|
||||
self.import_errors: Dict[str, str] = {}
|
||||
self.has_logged = False
|
||||
self.store_serialized_dags = store_serialized_dags
|
||||
self.read_dags_from_db = read_dags_from_db
|
||||
|
||||
self.collect_dags(
|
||||
dag_folder=dag_folder,
|
||||
|
@ -109,6 +119,15 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
"""
|
||||
return len(self.dags)
|
||||
|
||||
@property
|
||||
def store_serialized_dags(self) -> bool:
|
||||
"""Whether or not to read dags from DB"""
|
||||
warnings.warn(
|
||||
"The store_serialized_dags property has been deprecated. "
|
||||
"Use read_dags_from_db instead.", DeprecationWarning, stacklevel=2
|
||||
)
|
||||
return self.read_dags_from_db
|
||||
|
||||
@property
|
||||
def dag_ids(self) -> List[str]:
|
||||
return list(self.dags.keys())
|
||||
|
@ -123,8 +142,8 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
# Avoid circular import
|
||||
from airflow.models.dag import DagModel
|
||||
|
||||
# Only read DAGs from DB if this dagbag is store_serialized_dags.
|
||||
if self.store_serialized_dags:
|
||||
# Only read DAGs from DB if this dagbag is read_dags_from_db.
|
||||
if self.read_dags_from_db:
|
||||
# Import here so that serialized dag is only imported when serialization is enabled
|
||||
from airflow.models.serialized_dag import SerializedDagModel
|
||||
if dag_id not in self.dags:
|
||||
|
@ -363,7 +382,7 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
**Note**: The patterns in .airflowignore are treated as
|
||||
un-anchored regexes, not shell-like glob patterns.
|
||||
"""
|
||||
if self.store_serialized_dags:
|
||||
if self.read_dags_from_db:
|
||||
return
|
||||
|
||||
self.log.info("Filling up the DagBag from %s", dag_folder)
|
||||
|
@ -439,7 +458,7 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
self.log.debug("Calling the DAG.bulk_sync_to_db method")
|
||||
DAG.bulk_sync_to_db(self.dags.values())
|
||||
# Write Serialized DAGs to DB if DAG Serialization is turned on
|
||||
# Even though self.store_serialized_dags is False
|
||||
# Even though self.read_dags_from_db is False
|
||||
if settings.STORE_SERIALIZED_DAGS:
|
||||
self.log.debug("Calling the SerializedDagModel.bulk_sync_to_db method")
|
||||
SerializedDagModel.bulk_sync_to_db(self.dags.values())
|
||||
|
|
|
@ -29,4 +29,4 @@ def init_dagbag(app):
|
|||
if os.environ.get('SKIP_DAGS_PARSING') == 'True':
|
||||
app.dag_bag = DagBag(os.devnull, include_examples=False)
|
||||
else:
|
||||
app.dag_bag = DagBag(DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
|
||||
app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=STORE_SERIALIZED_DAGS)
|
||||
|
|
|
@ -128,7 +128,7 @@ class TestGetDagDetails(TestDagEndpoint):
|
|||
def test_should_response_200_serialized(self):
|
||||
# Create empty app with empty dagbag to check if DAG is read from db
|
||||
app_serialized = app.create_app(testing=True) # type:ignore
|
||||
dag_bag = DagBag(os.devnull, include_examples=False, store_serialized_dags=True)
|
||||
dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True)
|
||||
app_serialized.dag_bag = dag_bag # type:ignore
|
||||
client = app_serialized.test_client()
|
||||
|
||||
|
|
|
@ -95,7 +95,7 @@ class TestGetTask(TestTaskEndpoint):
|
|||
def test_should_response_200_serialized(self):
|
||||
# Create empty app with empty dagbag to check if DAG is read from db
|
||||
app_serialized = app.create_app(testing=True) # type:ignore
|
||||
dag_bag = DagBag(os.devnull, include_examples=False, store_serialized_dags=True)
|
||||
dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True)
|
||||
app_serialized.dag_bag = dag_bag # type:ignore
|
||||
client = app_serialized.test_client()
|
||||
|
||||
|
|
|
@ -128,9 +128,9 @@ class TestDagFileProcessor(unittest.TestCase):
|
|||
@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
|
||||
def setUpClass(cls):
|
||||
# Ensure the DAGs we are looking at from the DB are up-to-date
|
||||
non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
|
||||
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False)
|
||||
non_serialized_dagbag.sync_to_db()
|
||||
cls.dagbag = DagBag(store_serialized_dags=True)
|
||||
cls.dagbag = DagBag(read_dags_from_db=True)
|
||||
|
||||
def test_dag_file_processor_sla_miss_callback(self):
|
||||
"""
|
||||
|
@ -1373,9 +1373,9 @@ class TestSchedulerJob(unittest.TestCase):
|
|||
@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
|
||||
def setUpClass(cls):
|
||||
# Ensure the DAGs we are looking at from the DB are up-to-date
|
||||
non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
|
||||
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False)
|
||||
non_serialized_dagbag.sync_to_db()
|
||||
cls.dagbag = DagBag(store_serialized_dags=True)
|
||||
cls.dagbag = DagBag(read_dags_from_db=True)
|
||||
|
||||
def test_is_alive(self):
|
||||
job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
|
||||
|
|
|
@ -642,7 +642,7 @@ class TestDagBag(unittest.TestCase):
|
|||
def test_serialized_dags_are_written_to_db_on_sync(self):
|
||||
"""
|
||||
Test that when dagbag.sync_to_db is called the DAGs are Serialized and written to DB
|
||||
even when dagbag.store_serialized_dags is False
|
||||
even when dagbag.read_dags_from_db is False
|
||||
"""
|
||||
with create_session() as session:
|
||||
serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
|
||||
|
@ -653,7 +653,7 @@ class TestDagBag(unittest.TestCase):
|
|||
include_examples=False)
|
||||
dagbag.sync_to_db()
|
||||
|
||||
self.assertFalse(dagbag.store_serialized_dags)
|
||||
self.assertFalse(dagbag.read_dags_from_db)
|
||||
|
||||
new_serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
|
||||
self.assertEqual(new_serialized_dags_count, 1)
|
||||
|
|
Загрузка…
Ссылка в новой задаче