diff --git a/scripts/ci/docker-compose/local.yml b/scripts/ci/docker-compose/local.yml index a6c8d7eaab..619156f5be 100644 --- a/scripts/ci/docker-compose/local.yml +++ b/scripts/ci/docker-compose/local.yml @@ -50,7 +50,6 @@ services: - ../../../pylintrc:/opt/airflow/pylintrc:cached - ../../../pytest.ini:/opt/airflow/pytest.ini:cached - ../../../scripts:/opt/airflow/scripts:cached - - ../../../scripts/perf:/opt/airflow/scripts/perf:cached - ../../../scripts/in_container/entrypoint_ci.sh:/entrypoint:cached - ../../../setup.cfg:/opt/airflow/setup.cfg:cached - ../../../setup.py:/opt/airflow/setup.py:cached diff --git a/scripts/ci/libraries/_local_mounts.sh b/scripts/ci/libraries/_local_mounts.sh index 63989027f5..4300af9f13 100644 --- a/scripts/ci/libraries/_local_mounts.sh +++ b/scripts/ci/libraries/_local_mounts.sh @@ -46,7 +46,6 @@ function generate_local_mounts_list { "$prefix"pylintrc:/opt/airflow/pylintrc:cached "$prefix"pytest.ini:/opt/airflow/pytest.ini:cached "$prefix"scripts:/opt/airflow/scripts:cached - "$prefix"scripts/perf:/opt/airflow/scripts/perf:cached "$prefix"scripts/in_container/entrypoint_ci.sh:/entrypoint:cached "$prefix"setup.cfg:/opt/airflow/setup.cfg:cached "$prefix"setup.py:/opt/airflow/setup.py:cached diff --git a/tests/conftest.py b/tests/conftest.py index 5414e84408..eac88d6271 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -33,12 +33,7 @@ os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True" os.environ["AWS_DEFAULT_REGION"] = (os.environ.get("AWS_DEFAULT_REGION") or "us-east-1") os.environ["CREDENTIALS_DIR"] = (os.environ.get('CREDENTIALS_DIR') or "/files/airflow-breeze-config/keys") -perf_directory = os.path.abspath(os.path.join(tests_directory, os.pardir, 'scripts', 'perf')) -if perf_directory not in sys.path: - sys.path.append(perf_directory) - - -from perf_kit.sqlalchemy import ( # noqa: E402 isort:skip # pylint: disable=wrong-import-position +from tests.utils.perf.perf_kit.sqlalchemy import ( # noqa isort:skip # pylint: disable=wrong-import-position count_queries, trace_queries ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 0020eadca2..49e0ee8948 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -63,7 +63,7 @@ from tests.test_utils.mock_executor import MockExecutor ROOT_FOLDER = os.path.realpath( os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir) ) -PERF_DAGS_FOLDER = os.path.join(ROOT_FOLDER, "scripts", "perf", "dags") +PERF_DAGS_FOLDER = os.path.join(ROOT_FOLDER, "tests", "utils", "perf", "dags") ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py") TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER'] diff --git a/scripts/perf/dags/elastic_dag.py b/tests/utils/perf/dags/elastic_dag.py similarity index 100% rename from scripts/perf/dags/elastic_dag.py rename to tests/utils/perf/dags/elastic_dag.py diff --git a/scripts/perf/dags/perf_dag_1.py b/tests/utils/perf/dags/perf_dag_1.py similarity index 100% rename from scripts/perf/dags/perf_dag_1.py rename to tests/utils/perf/dags/perf_dag_1.py diff --git a/scripts/perf/dags/perf_dag_2.py b/tests/utils/perf/dags/perf_dag_2.py similarity index 100% rename from scripts/perf/dags/perf_dag_2.py rename to tests/utils/perf/dags/perf_dag_2.py diff --git a/scripts/perf/dags/sql_perf_dag.py b/tests/utils/perf/dags/sql_perf_dag.py similarity index 100% rename from scripts/perf/dags/sql_perf_dag.py rename to tests/utils/perf/dags/sql_perf_dag.py diff --git a/scripts/perf/perf_kit/__init__.py b/tests/utils/perf/perf_kit/__init__.py similarity index 81% rename from scripts/perf/perf_kit/__init__.py rename to tests/utils/perf/perf_kit/__init__.py index 7c79c3562f..d9a2c4892e 100644 --- a/scripts/perf/perf_kit/__init__.py +++ b/tests/utils/perf/perf_kit/__init__.py @@ -32,21 +32,21 @@ Content ======= The following decorators and context managers are included. -.. autofunction:: perf_kit.memory.trace_memory +.. autofunction:: tests.utils.perf.perf_kit.memory.trace_memory -.. autofunction:: perf_kit.python.pyspy +.. autofunction:: tests.utils.perf.perf_kit.python.pyspy -.. autofunction:: perf_kit.python.profiled +.. autofunction:: tests.utils.perf.perf_kit.python.profiled -.. autofunction:: perf_kit.repeat_and_time.timing +.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.timing -.. autofunction:: perf_kit.repeat_and_time.repeat +.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.repeat -.. autofunction:: perf_kit.repeat_and_time.timeout +.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.timeout -.. autofunction:: perf_kit.sqlalchemy.trace_queries +.. autofunction:: tests.utils.perf.perf_kit.sqlalchemy.trace_queries -.. autofunction:: perf_kit.sqlalchemy.count_queries +.. autofunction:: tests.utils.perf.perf_kit.sqlalchemy.count_queries Documentation for each function is provided in the function docstrings. Each module also has an example in the main section of the module. @@ -54,11 +54,12 @@ the main section of the module. Examples ======== -If you want to run an all example for ``perf_kit.sqlalchemy``, you can run the following command. +If you want to run an all example for ``tests.utils.perf.perf_kit.sqlalchemy``, you can run the +following command. .. code-block:: bash - python -m perf_kit.sqlalchemy + python -m tests.utils.perf_kit.sqlalchemy If you want to know how to use these functions, it is worth to familiarize yourself with these examples. @@ -98,7 +99,7 @@ queries in it. self.assertEqual(prev_local.isoformat(), "2018-03-24T03:00:00+01:00") self.assertEqual(prev.isoformat(), "2018-03-24T02:00:00+00:00") - from perf_kit.sqlalchemy import trace_queries + from tests.utils.perf.perf_kit.sqlalchemy import trace_queries @trace_queries def test_bulk_sync_to_db(self): diff --git a/scripts/perf/perf_kit/memory.py b/tests/utils/perf/perf_kit/memory.py similarity index 98% rename from scripts/perf/perf_kit/memory.py rename to tests/utils/perf/perf_kit/memory.py index ba3576e393..f84c505efa 100644 --- a/scripts/perf/perf_kit/memory.py +++ b/tests/utils/perf/perf_kit/memory.py @@ -35,6 +35,7 @@ def _human_readable_size(size, decimal_places=3): class TraceMemoryResult: + """Trace results of memory,""" def __init__(self): self.before = 0 self.after = 0 diff --git a/scripts/perf/perf_kit/python.py b/tests/utils/perf/perf_kit/python.py similarity index 86% rename from scripts/perf/perf_kit/python.py rename to tests/utils/perf/perf_kit/python.py index 36a0b11afa..3169e9c43e 100644 --- a/scripts/perf/perf_kit/python.py +++ b/tests/utils/perf/perf_kit/python.py @@ -45,7 +45,7 @@ def pyspy(): cap_add: - SYS_PTRACE - In the case of Airflow Breeze, you should modify the ``scripts/perf/perf_kit/python.py`` file. + In the case of Airflow Breeze, you should modify the ``tests/utils/perf/perf_kit/python.py`` file. """ pid = str(os.getpid()) suffix = datetime.datetime.now().isoformat() @@ -66,24 +66,28 @@ def profiled(print_callers=False): This decorator provide deterministic profiling. It uses ``cProfile`` internally. It generates statistic and print on the screen. """ - pr = cProfile.Profile() - pr.enable() + profile = cProfile.Profile() + profile.enable() try: yield finally: - pr.disable() - s = io.StringIO() - ps = pstats.Stats(pr, stream=s).sort_stats("cumulative") + profile.disable() + stat = io.StringIO() + pstatistics = pstats.Stats(profile, stream=stat).sort_stats("cumulative") if print_callers: - ps.print_callers() + pstatistics.print_callers() else: - ps.print_stats() - print(s.getvalue()) + pstatistics.print_stats() + print(stat.getvalue()) if __name__ == "__main__": def case(): + """ + Load modules. + :return: + """ import logging import airflow diff --git a/scripts/perf/perf_kit/repeat_and_time.py b/tests/utils/perf/perf_kit/repeat_and_time.py similarity index 88% rename from scripts/perf/perf_kit/repeat_and_time.py rename to tests/utils/perf/perf_kit/repeat_and_time.py index 519f80249f..8efd7f10b6 100644 --- a/scripts/perf/perf_kit/repeat_and_time.py +++ b/tests/utils/perf/perf_kit/repeat_and_time.py @@ -23,6 +23,7 @@ import time class TimingResult: + """Timing result.""" def __init__(self): self.start_time = 0 self.end_time = 0 @@ -65,7 +66,7 @@ def repeat(repeat_count=5): @functools.wraps(f) def wrap(*args, **kwargs): last_result = None - for i in range(repeat_count): + for _ in range(repeat_count): last_result = f(*args, **kwargs) return last_result @@ -75,7 +76,7 @@ def repeat(repeat_count=5): class TimeoutException(Exception): - pass + """Exception when the test timeo uts""" @contextlib.contextmanager @@ -109,13 +110,13 @@ def timeout(seconds=1): if __name__ == "__main__": def monte_carlo(total=10000): - # Monte Carlo + """Monte Carlo""" inside = 0 - for i in range(0, total): - x2 = random.random() ** 2 - y2 = random.random() ** 2 - if math.sqrt(x2 + y2) < 1.0: + for _ in range(0, total): + x_val = random.random() ** 2 + y_val = random.random() ** 2 + if math.sqrt(x_val + y_val) < 1.0: inside += 1 return (float(inside) / total) * 4 @@ -134,15 +135,16 @@ if __name__ == "__main__": @timing(REPEAT_COUNT) @repeat(REPEAT_COUNT) @timing() - def pi(): + def get_pi(): + """Returns PI value:""" return monte_carlo() - result = pi() - print("PI: ", result) + res = get_pi() + print("PI: ", res) print() # Example 3: with timing(): - result = monte_carlo() + res = monte_carlo() - print("PI: ", result) + print("PI: ", res) diff --git a/scripts/perf/perf_kit/sqlalchemy.py b/tests/utils/perf/perf_kit/sqlalchemy.py similarity index 69% rename from scripts/perf/perf_kit/sqlalchemy.py rename to tests/utils/perf/perf_kit/sqlalchemy.py index 72ab7a3c0b..eb7f72bdd5 100644 --- a/scripts/perf/perf_kit/sqlalchemy.py +++ b/tests/utils/perf/perf_kit/sqlalchemy.py @@ -32,6 +32,7 @@ def _pretty_format_sql(text: str): return text +# noinspection PyUnusedLocal class TraceQueries: """ Tracking SQL queries in a code block. @@ -61,11 +62,46 @@ class TraceQueries: self.print_fn = print_fn self.query_count = 0 - def before_cursor_execute(self, conn, cursor, statement, parameters, context, executemany): + def before_cursor_execute(self, + conn, + cursor, # pylint: disable=unused-argument + statement, # pylint: disable=unused-argument + parameters, # pylint: disable=unused-argument + context, # pylint: disable=unused-argument + executemany): # pylint: disable=unused-argument + """ + Executed before cursor. + + :param conn: connection + :param cursor: cursor + :param statement: statement + :param parameters: parameters + :param context: context + :param executemany: whether many statements executed + :return: + """ + conn.info.setdefault("query_start_time", []).append(time.monotonic()) self.query_count += 1 - def after_cursor_execute(self, conn, cursor, statement, parameters, context, executemany): + def after_cursor_execute(self, + conn, + cursor, # pylint: disable=unused-argument + statement, + parameters, + context, # pylint: disable=unused-argument + executemany): # pylint: disable=unused-argument + """ + Executed after cursor. + + :param conn: connection + :param cursor: cursor + :param statement: statement + :param parameters: parameters + :param context: context + :param executemany: whether many statements executed + :return: + """ total = time.monotonic() - conn.info["query_start_time"].pop() file_names = [ f"{f.filename}:{f.name}:{f.lineno}" @@ -102,7 +138,8 @@ class TraceQueries: event.listen(airflow.settings.engine, "before_cursor_execute", self.before_cursor_execute) event.listen(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute) - def __exit__(self, type_, value, traceback): + # noinspection PyShadowingNames + def __exit__(self, type_, value, traceback): # pylint: disable=redefined-outer-name import airflow.settings event.remove(airflow.settings.engine, "before_cursor_execute", self.before_cursor_execute) event.remove(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute) @@ -112,6 +149,9 @@ trace_queries = TraceQueries # pylint: disable=invalid-name class CountQueriesResult: + """ + Counter for number of queries. + """ def __init__(self): self.count = 0 @@ -136,13 +176,30 @@ class CountQueries: event.listen(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute) return self.result - def __exit__(self, type_, value, traceback): + # noinspection PyShadowingNames + def __exit__(self, type_, value, traceback): # pylint: disable=redefined-outer-name import airflow.settings event.remove(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute) self.print_fn(f"Count SQL queries: {self.result.count}") - def after_cursor_execute(self, *args, **kwargs): + def after_cursor_execute(self, + conn, # pylint: disable=unused-argument + cursor, # pylint: disable=unused-argument + statement, # pylint: disable=unused-argument + parameters, # pylint: disable=unused-argument + context, # pylint: disable=unused-argument + executemany): # pylint: disable=unused-argument + """ + Executed after cursor. + + :param conn: connection + :param cursor: cursor + :param statement: statement + :param parameters: parameters + :param context: context + :param executemany: whether many statements executed + """ self.result.count += 1 @@ -152,6 +209,7 @@ if __name__ == "__main__": # Example: def case(): + "Case of logging om/" import logging from unittest import mock diff --git a/scripts/perf/scheduler_dag_execution_timing.py b/tests/utils/perf/scheduler_dag_execution_timing.py similarity index 97% rename from scripts/perf/scheduler_dag_execution_timing.py rename to tests/utils/perf/scheduler_dag_execution_timing.py index ee177a29bb..6756366a13 100755 --- a/scripts/perf/scheduler_dag_execution_timing.py +++ b/tests/utils/perf/scheduler_dag_execution_timing.py @@ -101,15 +101,15 @@ def get_executor_under_test(dotted_path): if dotted_path == "MockExecutor": try: # Run against master and 1.10.x releases - from tests.test_utils.mock_executor import MockExecutor as Executor + from tests.test_utils.mock_executor import MockExecutor as executor except ImportError: - from tests.executors.test_executor import TestExecutor as Executor + from tests.executors.test_executor import TestExecutor as executor else: - Executor = ExecutorLoader.load_executor(dotted_path) + executor = ExecutorLoader.load_executor(dotted_path) # Change this to try other executors - class ShortCircuitExecutor(ShortCircuitExecutorMixin, Executor): + class ShortCircuitExecutor(ShortCircuitExecutorMixin, executor): """ Placeholder class that implements the inheritance hierarchy """ @@ -153,16 +153,16 @@ def create_dag_runs(dag, num_runs, session): try: from airflow.utils.types import DagRunType - ID_PREFIX = f'{DagRunType.SCHEDULED.value}__' + id_prefix = f'{DagRunType.SCHEDULED.value}__' except ImportError: from airflow.models.dagrun import DagRun - ID_PREFIX = DagRun.ID_PREFIX + id_prefix = DagRun.ID_PREFIX # pylint: disable=no-member next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) for _ in range(num_runs): dag.create_dagrun( - run_id=ID_PREFIX + next_run_date.isoformat(), + run_id=id_prefix + next_run_date.isoformat(), execution_date=next_run_date, start_date=timezone.utcnow(), state=State.RUNNING, diff --git a/scripts/perf/scheduler_ops_metrics.py b/tests/utils/perf/scheduler_ops_metrics.py similarity index 100% rename from scripts/perf/scheduler_ops_metrics.py rename to tests/utils/perf/scheduler_ops_metrics.py diff --git a/scripts/perf/sql_queries.py b/tests/utils/perf/sql_queries.py similarity index 100% rename from scripts/perf/sql_queries.py rename to tests/utils/perf/sql_queries.py