[AIRFLOW-5444] Fix action_logging so that request.form for POST is logged (#6064)

Log request.values so both GET and POST are properly logged
This commit is contained in:
yuqian90 2019-10-04 17:31:50 +08:00 коммит произвёл Ash Berlin-Taylor
Родитель e3731b1e33
Коммит e9ab9d6a92
2 изменённых файлов: 100 добавлений и 5 удалений

Просмотреть файл

@ -45,13 +45,13 @@ def action_logging(f):
event=f.__name__,
task_instance=None,
owner=user,
extra=str(list(request.args.items())),
task_id=request.args.get('task_id'),
dag_id=request.args.get('dag_id'))
extra=str(list(request.values.items())),
task_id=request.values.get('task_id'),
dag_id=request.values.get('dag_id'))
if 'execution_date' in request.args:
if 'execution_date' in request.values:
log.execution_date = pendulum.parse(
request.args.get('execution_date'))
request.values.get('execution_date'))
session.add(log)

Просмотреть файл

@ -1882,5 +1882,100 @@ class TestDagRunModelView(TestBase):
self.assertEqual(dr.execution_date, timezone.convert_to_utc(datetime(2018, 7, 6, 5, 4, 3)))
class TestDecorators(TestBase):
EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))
@classmethod
def setUpClass(cls):
super().setUpClass()
dagbag = models.DagBag(include_examples=True)
for dag in dagbag.dags.values():
dag.sync_to_db()
def setUp(self):
super().setUp()
self.logout()
self.login()
self.cleanup_dagruns()
self.prepare_dagruns()
def cleanup_dagruns(self):
DR = models.DagRun
dag_ids = ['example_bash_operator',
'example_subdag_operator',
'example_xcom']
(self.session
.query(DR)
.filter(DR.dag_id.in_(dag_ids))
.filter(DR.run_id == self.run_id)
.delete(synchronize_session='fetch'))
self.session.commit()
def prepare_dagruns(self):
dagbag = models.DagBag(include_examples=True)
self.bash_dag = dagbag.dags['example_bash_operator']
self.sub_dag = dagbag.dags['example_subdag_operator']
self.xcom_dag = dagbag.dags['example_xcom']
self.bash_dagrun = self.bash_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)
self.sub_dagrun = self.sub_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)
self.xcom_dagrun = self.xcom_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)
def check_last_log(self, dag_id, event, execution_date=None):
from airflow.models import Log
qry = self.session.query(Log.dag_id, Log.task_id, Log.event, Log.execution_date,
Log.owner, Log.extra)
qry = qry.filter(Log.dag_id == dag_id, Log.event == event)
if execution_date:
qry = qry.filter(Log.execution_date == execution_date)
logs = qry.order_by(Log.dttm.desc()).limit(5).all()
self.assertGreaterEqual(len(logs), 1)
self.assertTrue(logs[0].extra)
def test_action_logging_get(self):
url = 'graph?dag_id=example_bash_operator&execution_date={}'.format(
self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('runme_1', resp)
# In mysql backend, this commit() is needed to write down the logs
self.session.commit()
self.check_last_log("example_bash_operator", event="graph",
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE)
def test_action_logging_post(self):
form = dict(
task_id="runme_1",
dag_id="example_bash_operator",
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
upstream="false",
downstream="false",
future="false",
past="false",
only_failed="false",
)
resp = self.client.post("clear", data=form)
self.check_content_in_response(['example_bash_operator', 'Wait a minute'], resp)
# In mysql backend, this commit() is needed to write down the logs
self.session.commit()
self.check_last_log("example_bash_operator", event="clear",
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE)
if __name__ == '__main__':
unittest.main()