[AIRFLOW-2918] Remove unused imports

This commit is contained in:
Fokko Driesprong 2018-09-21 16:36:28 +02:00 коммит произвёл Sid Anand
Родитель dd85126f26
Коммит 491fd743da
28 изменённых файлов: 96 добавлений и 98 удалений

Просмотреть файл

@ -18,7 +18,6 @@
# under the License.
import os
import sys

Просмотреть файл

@ -28,12 +28,9 @@ from builtins import input
from past.builtins import basestring
from datetime import datetime
from functools import reduce
import imp
import os
import re
import signal
import sys
import warnings
from jinja2 import Template

Просмотреть файл

@ -237,6 +237,7 @@ def dag_run_status(dag_id, execution_date):
return jsonify(info)
@api_experimental.route('/latest_runs', methods=['GET'])
@requires_authentication
def latest_dag_runs():

Просмотреть файл

@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
#
# Airflow documentation build configuration file, created by
# flake8: noqa
# Disable Flake8 because of all the sphinx imports
#
# Airflow documentation build configuration file, created by
# sphinx-quickstart on Thu Oct 9 20:50:01 2014.
#
# This file is execfile()d with the current directory set to its

Просмотреть файл

@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Просмотреть файл

@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Просмотреть файл

@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -129,8 +129,7 @@ class SchedulerMetricsJob(SchedulerJob):
if len(successful_tis) == num_task_instances:
self.log.info("All tasks processed! Printing stats.")
else:
self.log.info("Test timeout reached. "
"Printing available stats.")
self.log.info("Test timeout reached. Printing available stats.")
self.print_stats()
set_dags_paused_state(True)
sys.exit()

Просмотреть файл

@ -16,7 +16,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from .operators import *
from .sensors import *

Просмотреть файл

@ -23,7 +23,6 @@ import io
import json
import textwrap
import zipfile
import base64
from airflow.contrib.hooks.aws_lambda_hook import AwsLambdaHook

Просмотреть файл

@ -19,7 +19,6 @@
#
import unittest
from mock import call
from mock import MagicMock
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook,\

Просмотреть файл

@ -19,9 +19,7 @@
#
import six
import sys
import unittest
from io import StringIO
from itertools import dropwhile
from mock import patch, call

Просмотреть файл

@ -18,7 +18,6 @@
# under the License.
#
import jinja2
import unittest
from datetime import datetime

Просмотреть файл

@ -22,7 +22,6 @@ from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import sys
import unittest
from airflow.hooks.postgres_hook import PostgresHook

Просмотреть файл

@ -435,7 +435,7 @@ class CoreTest(unittest.TestCase):
Tests that Operators reject illegal arguments
"""
with warnings.catch_warnings(record=True) as w:
t = BashOperator(
BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=self.dag,
@ -947,7 +947,7 @@ class CoreTest(unittest.TestCase):
models.DagStat.update([], session=session)
run1 = self.dag_bash.create_dagrun(
self.dag_bash.create_dagrun(
run_id="run1",
execution_date=DEFAULT_DATE,
state=State.RUNNING)
@ -965,7 +965,7 @@ class CoreTest(unittest.TestCase):
self.assertEqual(stats.count, 0)
self.assertFalse(stats.dirty)
run2 = self.dag_bash.create_dagrun(
self.dag_bash.create_dagrun(
run_id="run2",
execution_date=DEFAULT_DATE + timedelta(days=1),
state=State.RUNNING)

Просмотреть файл

@ -16,5 +16,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from .dask_executor import *

Просмотреть файл

@ -27,7 +27,7 @@ from airflow.executors.celery_executor import CELERY_FETCH_ERR_MSG_HEADER
from airflow.utils.state import State
# leave this it is used by the test worker
import celery.contrib.testing.tasks
import celery.contrib.testing.tasks # noqa: F401
class CeleryExecutorTest(unittest.TestCase):

Просмотреть файл

@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime, timedelta
from datetime import datetime
import unittest
from airflow.macros import hive

Просмотреть файл

@ -38,7 +38,6 @@ from airflow import configuration, models, settings, AirflowException
from airflow.exceptions import AirflowDagCycleException, AirflowSkipException
from airflow.jobs import BackfillJob
from airflow.models import DAG, TaskInstance as TI
from airflow.models import DagRun
from airflow.models import State as ST
from airflow.models import DagModel, DagRun, DagStat
from airflow.models import clear_task_instances
@ -342,20 +341,41 @@ class DagTest(unittest.TestCase):
session.merge(ti4)
session.commit()
self.assertEqual(0, DAG.get_num_task_instances(test_dag_id, ['fakename'],
session=session))
self.assertEqual(4, DAG.get_num_task_instances(test_dag_id, [test_task_id],
session=session))
self.assertEqual(4, DAG.get_num_task_instances(test_dag_id,
['fakename', test_task_id], session=session))
self.assertEqual(1, DAG.get_num_task_instances(test_dag_id, [test_task_id],
states=[None], session=session))
self.assertEqual(2, DAG.get_num_task_instances(test_dag_id, [test_task_id],
states=[State.RUNNING], session=session))
self.assertEqual(3, DAG.get_num_task_instances(test_dag_id, [test_task_id],
states=[None, State.RUNNING], session=session))
self.assertEqual(4, DAG.get_num_task_instances(test_dag_id, [test_task_id],
states=[None, State.QUEUED, State.RUNNING], session=session))
self.assertEqual(
0,
DAG.get_num_task_instances(test_dag_id, ['fakename'], session=session)
)
self.assertEqual(
4,
DAG.get_num_task_instances(test_dag_id, [test_task_id], session=session)
)
self.assertEqual(
4,
DAG.get_num_task_instances(
test_dag_id, ['fakename', test_task_id], session=session)
)
self.assertEqual(
1,
DAG.get_num_task_instances(
test_dag_id, [test_task_id], states=[None], session=session)
)
self.assertEqual(
2,
DAG.get_num_task_instances(
test_dag_id, [test_task_id], states=[State.RUNNING], session=session)
)
self.assertEqual(
3,
DAG.get_num_task_instances(
test_dag_id, [test_task_id],
states=[None, State.RUNNING], session=session)
)
self.assertEqual(
4,
DAG.get_num_task_instances(
test_dag_id, [test_task_id],
states=[None, State.QUEUED, State.RUNNING], session=session)
)
session.close()
def test_render_template_field(self):
@ -376,7 +396,7 @@ class DagTest(unittest.TestCase):
dag = DAG('test-dag',
start_date=DEFAULT_DATE,
user_defined_macros = dict(foo='bar'))
user_defined_macros=dict(foo='bar'))
with dag:
task = DummyOperator(task_id='op1')
@ -415,11 +435,11 @@ class DagTest(unittest.TestCase):
if a custom filter was defined"""
def jinja_udf(name):
return 'Hello %s' %name
return 'Hello %s' % name
dag = DAG('test-dag',
start_date=DEFAULT_DATE,
user_defined_filters = dict(hello=jinja_udf))
user_defined_filters=dict(hello=jinja_udf))
with dag:
task = DummyOperator(task_id='op1')
@ -693,10 +713,14 @@ class DagRunTest(unittest.TestCase):
session.commit()
self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True)))
self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False)))
self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True)))
self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False)))
self.assertEqual(1,
len(models.DagRun.find(dag_id=dag_id1, external_trigger=True)))
self.assertEqual(0,
len(models.DagRun.find(dag_id=dag_id1, external_trigger=False)))
self.assertEqual(0,
len(models.DagRun.find(dag_id=dag_id2, external_trigger=True)))
self.assertEqual(1,
len(models.DagRun.find(dag_id=dag_id2, external_trigger=False)))
def test_dagrun_success_when_all_skipped(self):
"""
@ -1081,9 +1105,9 @@ class DagRunTest(unittest.TestCase):
dag_id='test_latest_runs_1',
start_date=DEFAULT_DATE)
dag_1_run_1 = self.create_dag_run(dag,
execution_date=timezone.datetime(2015, 1, 1))
execution_date=timezone.datetime(2015, 1, 1))
dag_1_run_2 = self.create_dag_run(dag,
execution_date=timezone.datetime(2015, 1, 2))
execution_date=timezone.datetime(2015, 1, 2))
dagruns = models.DagRun.get_latest_runs(session)
session.close()
for dagrun in dagruns:
@ -1207,7 +1231,7 @@ class DagBagTest(unittest.TestCase):
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))
self.assertEqual(len(dagbag.import_errors), len(invalid_dag_files))
@patch.object(DagModel,'get_current')
@patch.object(DagModel, 'get_current')
def test_get_dag_without_refresh(self, mock_dagmodel):
"""
Test that, once a DAG is loaded, it doesn't get refreshed again if it
@ -1221,6 +1245,7 @@ class DagBagTest(unittest.TestCase):
class TestDagBag(models.DagBag):
process_file_calls = 0
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
if 'example_bash_operator.py' == os.path.basename(filepath):
TestDagBag.process_file_calls += 1
@ -1440,6 +1465,7 @@ class DagBagTest(unittest.TestCase):
Don't crash when loading an invalid (contains a cycle) DAG file.
Don't load the dag into the DagBag either
"""
# Define Dag to load
def basic_cycle():
from airflow.models import DAG
@ -1611,7 +1637,8 @@ class TaskInstanceTest(unittest.TestCase):
"""
Test that tasks properly take start/end dates from DAGs
"""
dag = DAG('dag', start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + datetime.timedelta(days=10))
dag = DAG('dag', start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10))
op1 = DummyOperator(task_id='op_1', owner='test')
@ -1777,7 +1804,6 @@ class TaskInstanceTest(unittest.TestCase):
ti.run()
self.assertEqual(ti.state, models.State.NONE)
@patch.object(TI, 'pool_full')
def test_run_pooling_task(self, mock_pool_full):
"""
@ -1971,11 +1997,11 @@ class TaskInstanceTest(unittest.TestCase):
ti.try_number = 9
dt = ti.next_retry_datetime()
self.assertEqual(dt, ti.end_date+max_delay)
self.assertEqual(dt, ti.end_date + max_delay)
ti.try_number = 50
dt = ti.next_retry_datetime()
self.assertEqual(dt, ti.end_date+max_delay)
self.assertEqual(dt, ti.end_date + max_delay)
def test_depends_on_past(self):
dagbag = models.DagBag()
@ -2217,7 +2243,7 @@ class TaskInstanceTest(unittest.TestCase):
def test_check_and_change_state_before_execution_dep_not_met(self):
dag = models.DAG(dag_id='test_check_and_change_state_before_execution')
task = DummyOperator(task_id='task', dag=dag, start_date=DEFAULT_DATE)
task2= DummyOperator(task_id='task2', dag=dag, start_date=DEFAULT_DATE)
task2 = DummyOperator(task_id='task2', dag=dag, start_date=DEFAULT_DATE)
task >> task2
ti = TI(
task=task2, execution_date=timezone.utcnow())
@ -2436,7 +2462,8 @@ class ClearTasksTest(unittest.TestCase):
for i in range(num_of_dags):
dag = DAG('test_dag_clear_' + str(i), start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10))
ti = TI(task=DummyOperator(task_id='test_task_clear_' + str(i), owner='test', dag=dag),
ti = TI(task=DummyOperator(task_id='test_task_clear_' + str(i), owner='test',
dag=dag),
execution_date=DEFAULT_DATE)
dags.append(dag)
tis.append(ti)
@ -2658,8 +2685,8 @@ class ConnectionTest(unittest.TestCase):
self.assertIsNone(connection.extra)
def test_connection_from_uri_with_extras(self):
uri = 'scheme://user:password@host%2flocation:1234/schema?'\
'extra1=a%20value&extra2=%2fpath%2f'
uri = 'scheme://user:password@host%2flocation:1234/schema?' \
'extra1=a%20value&extra2=%2fpath%2f'
connection = Connection(uri=uri)
self.assertEqual(connection.conn_type, 'scheme')
self.assertEqual(connection.host, 'host/location')

Просмотреть файл

@ -17,11 +17,3 @@
# specific language governing permissions and limitations
# under the License.
from .docker_operator import *
from .subdag_operator import *
from .operators import *
from .hive_operator import *
from .s3_to_hive_operator import *
from .python_operator import *
from .latest_only_operator import *

Просмотреть файл

@ -20,11 +20,9 @@
from __future__ import print_function, unicode_literals
import datetime
import logging
import unittest
from airflow import configuration, DAG, settings
from airflow.jobs import BackfillJob
from airflow.models import TaskInstance
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.dummy_operator import DummyOperator

Просмотреть файл

@ -26,7 +26,6 @@ from collections import OrderedDict
import os
import mock
import six
import unittest
configuration.load_test_config()

Просмотреть файл

@ -28,13 +28,10 @@ import unittest
from subprocess import CalledProcessError
from airflow import configuration, DAG
from airflow.models import TaskInstance
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.exceptions import AirflowException
import logging
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
END_DATE = timezone.datetime(2016, 1, 2)
@ -83,7 +80,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
def test_no_system_site_packages(self):
def f():
try:
import funcsigs
import funcsigs # noqa: F401
except ImportError:
return True
raise Exception
@ -91,26 +88,31 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
def test_system_site_packages(self):
def f():
import funcsigs
import funcsigs # noqa: F401
self._run_as_operator(f, requirements=['funcsigs'], system_site_packages=True)
def test_with_requirements_pinned(self):
self.assertNotEqual('0.4', funcsigs.__version__, 'Please update this string if this fails')
self.assertNotEqual(
'0.4', funcsigs.__version__, 'Please update this string if this fails')
def f():
import funcsigs
import funcsigs # noqa: F401
if funcsigs.__version__ != '0.4':
raise Exception
self._run_as_operator(f, requirements=['funcsigs==0.4'])
def test_unpinned_requirements(self):
def f():
import funcsigs
self._run_as_operator(f, requirements=['funcsigs', 'dill'], system_site_packages=False)
import funcsigs # noqa: F401
self._run_as_operator(
f, requirements=['funcsigs', 'dill'], system_site_packages=False)
def test_range_requirements(self):
def f():
import funcsigs
self._run_as_operator(f, requirements=['funcsigs>1.0', 'dill'], system_site_packages=False)
import funcsigs # noqa: F401
self._run_as_operator(
f, requirements=['funcsigs>1.0', 'dill'], system_site_packages=False)
def test_fail(self):
def f():
@ -152,8 +154,10 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
version = 3
else:
version = 2
def f():
pass
with self.assertRaises(AirflowException):
self._run_as_operator(f, python_version=version, op_args=[1])
@ -168,11 +172,12 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
print(virtualenv_string_args)
if virtualenv_string_args[0] != virtualenv_string_args[2]:
raise Exception
self._run_as_operator(f, python_version=self._invert_python_major_version(), string_args=[1,2,1])
self._run_as_operator(
f, python_version=self._invert_python_major_version(), string_args=[1, 2, 1])
def test_with_args(self):
def f(a, b, c=False, d=False):
if a==0 and b==1 and c and not d:
if a == 0 and b == 1 and c and not d:
return True
else:
raise Exception

Просмотреть файл

@ -22,12 +22,9 @@ from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import inspect
import logging
import unittest
from flask.blueprints import Blueprint
from flask_admin import BaseView
from flask_admin.menu import MenuLink, MenuView
from airflow.hooks.base_hook import BaseHook

Просмотреть файл

@ -16,5 +16,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from .kerberos import *

Просмотреть файл

@ -24,7 +24,6 @@ from mock import Mock
from airflow.models import DAG, BaseOperator
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
from airflow.utils.state import State
class TaskConcurrencyDepTest(unittest.TestCase):

Просмотреть файл

@ -20,7 +20,6 @@
import json
import mock
import os
import socket
import unittest
from datetime import datetime

Просмотреть файл

@ -17,11 +17,8 @@
# specific language governing permissions and limitations
# under the License.
import json
import unittest
from datetime import datetime
from backports.configparser import DuplicateSectionError
from airflow import models

Просмотреть файл

@ -31,7 +31,7 @@ import json
from urllib.parse import quote_plus
from werkzeug.test import Client
from airflow import models, configuration, settings
from airflow import models, configuration
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.dummy_operator import DummyOperator