Use Hash of Serialized DAG to determine DAG is changed or not (#10227)
closes #10116
This commit is contained in:
Родитель
0ee437547b
Коммит
adce6f0296
|
@ -0,0 +1,46 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Add dag_hash Column to serialized_dag table
|
||||
|
||||
Revision ID: da3f683c3a5a
|
||||
Revises: 8d48763f6d53
|
||||
Create Date: 2020-08-07 20:52:09.178296
|
||||
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'da3f683c3a5a'
|
||||
down_revision = '8d48763f6d53'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
"""Apply Add dag_hash Column to serialized_dag table"""
|
||||
op.add_column(
|
||||
'serialized_dag',
|
||||
sa.Column('dag_hash', sa.String(32), nullable=False, server_default='Hash not calculated yet'))
|
||||
|
||||
|
||||
def downgrade():
|
||||
"""Unapply Add dag_hash Column to serialized_dag table"""
|
||||
op.drop_column('serialized_dag', 'dag_hash')
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
"""Serialized DAG table in database."""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
@ -53,7 +54,7 @@ class SerializedDagModel(Base):
|
|||
interval of deleting serialized DAGs in DB when the files are deleted, suggest
|
||||
to use a smaller interval such as 60
|
||||
|
||||
It is used by webserver to load dagbags when ``store_serialized_dags=True``.
|
||||
It is used by webserver to load dags when ``store_serialized_dags=True``.
|
||||
Because reading from database is lightweight compared to importing from files,
|
||||
it solves the webserver scalability issue.
|
||||
"""
|
||||
|
@ -65,6 +66,7 @@ class SerializedDagModel(Base):
|
|||
fileloc_hash = Column(BigInteger, nullable=False)
|
||||
data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
|
||||
last_updated = Column(UtcDateTime, nullable=False)
|
||||
dag_hash = Column(String(32), nullable=False)
|
||||
|
||||
__table_args__ = (
|
||||
Index('idx_fileloc_hash', fileloc_hash, unique=False),
|
||||
|
@ -76,6 +78,7 @@ class SerializedDagModel(Base):
|
|||
self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
|
||||
self.data = SerializedDAG.to_dict(dag)
|
||||
self.last_updated = timezone.utcnow()
|
||||
self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<SerializedDag: {self.dag_id}>"
|
||||
|
@ -102,9 +105,11 @@ class SerializedDagModel(Base):
|
|||
return
|
||||
|
||||
log.debug("Checking if DAG (%s) changed", dag.dag_id)
|
||||
serialized_dag_from_db: SerializedDagModel = session.query(cls).get(dag.dag_id)
|
||||
new_serialized_dag = cls(dag)
|
||||
if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
|
||||
serialized_dag_hash_from_db = session.query(
|
||||
cls.dag_hash).filter(cls.dag_id == dag.dag_id).scalar()
|
||||
|
||||
if serialized_dag_hash_from_db == new_serialized_dag.dag_hash:
|
||||
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
|
||||
return
|
||||
|
||||
|
|
|
@ -84,27 +84,26 @@ class SerializedDagModelTest(unittest.TestCase):
|
|||
SDM.write_dag(dag=example_bash_op_dag)
|
||||
|
||||
with create_session() as session:
|
||||
last_updated = session.query(
|
||||
SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
|
||||
s_dag = session.query(SDM).get(example_bash_op_dag.dag_id)
|
||||
|
||||
# Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
|
||||
# column is not updated
|
||||
SDM.write_dag(dag=example_bash_op_dag)
|
||||
last_updated_1 = session.query(
|
||||
SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
|
||||
s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id)
|
||||
|
||||
self.assertEqual(last_updated, last_updated_1)
|
||||
self.assertEqual(s_dag_1.dag_hash, s_dag.dag_hash)
|
||||
self.assertEqual(s_dag.last_updated, s_dag_1.last_updated)
|
||||
|
||||
# Update DAG
|
||||
example_bash_op_dag.tags += ["new_tag"]
|
||||
self.assertCountEqual(example_bash_op_dag.tags, ["example", "new_tag"])
|
||||
|
||||
SDM.write_dag(dag=example_bash_op_dag)
|
||||
new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
|
||||
SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
|
||||
s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id)
|
||||
|
||||
self.assertNotEqual(last_updated, new_s_dag.last_updated)
|
||||
self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"])
|
||||
self.assertNotEqual(s_dag.last_updated, s_dag_2.last_updated)
|
||||
self.assertNotEqual(s_dag.dag_hash, s_dag_2.dag_hash)
|
||||
self.assertEqual(s_dag_2.data["dag"]["tags"], ["example", "new_tag"])
|
||||
|
||||
def test_read_dags(self):
|
||||
"""DAGs can be read from database."""
|
||||
|
|
Загрузка…
Ссылка в новой задаче