[AIRFLOW-3239] Enable existing CI tests (#4131)

1. Renamed files:
- tests/configuration.py → tests/test_configuration.py
- tests/impersonation.py → tests/test_impersonation.py
- tests/utils.py → tests/test_utils.py
- tests/operators/operators.py → tests/operators/test_operators.py
- tests/operators/bash_operator.py → tests/operators/test_bash_operator.py
- tests/jobs.py → tests/test_jobs.py

2. Updated tests/__init__.py accordingly

3. Fixed database-specific tests in tests/operators/test_operators.py

4. Fixed issue in tests/operators/test_bash_operator.py
This commit is contained in:
Xiaodong 2018-11-05 23:52:22 +08:00 коммит произвёл Fokko Driesprong
Родитель e53182cf83
Коммит 5ea0d97b49
7 изменённых файлов: 51 добавлений и 10 удалений

Просмотреть файл

@ -22,15 +22,5 @@
from __future__ import absolute_import
from .api import *
from .configuration import *
from .contrib import *
from .core import *
from .executors import *
from .jobs import *
from .impersonation import *
from .lineage import *
from .models import *
from .operators import *
from .security import *
from .task import *
from .utils import *

Просмотреть файл

@ -65,6 +65,9 @@ class BashOperatorTestCase(unittest.TestCase):
'echo $AIRFLOW_CTX_EXECUTION_DATE>> {0};'
'echo $AIRFLOW_CTX_DAG_RUN_ID>> {0};'.format(fname)
)
original_AIRFLOW_HOME = os.environ['AIRFLOW_HOME']
os.environ['AIRFLOW_HOME'] = 'MY_PATH_TO_AIRFLOW_HOME'
t.run(DEFAULT_DATE, DEFAULT_DATE,
ignore_first_depends_on_past=True, ignore_ti_state=True)
@ -78,3 +81,5 @@ class BashOperatorTestCase(unittest.TestCase):
self.assertIn('echo_env_vars', output)
self.assertIn(DEFAULT_DATE.isoformat(), output)
self.assertIn('manual__' + DEFAULT_DATE.isoformat(), output)
os.environ['AIRFLOW_HOME'] = original_AIRFLOW_HOME

Просмотреть файл

@ -53,6 +53,8 @@ class MySqlTest(unittest.TestCase):
for table in drop_tables:
conn.execute("DROP TABLE IF EXISTS {}".format(table))
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_operator_test(self):
sql = """
CREATE TABLE IF NOT EXISTS test_airflow (
@ -66,8 +68,11 @@ class MySqlTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_operator_test_multi(self):
sql = [
"CREATE TABLE IF NOT EXISTS test_airflow (dummy VARCHAR(50))",
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
@ -79,6 +84,8 @@ class MySqlTest(unittest.TestCase):
)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_hook_test_bulk_load(self):
records = ("foo", "bar", "baz")
@ -101,6 +108,8 @@ class MySqlTest(unittest.TestCase):
results = tuple(result[0] for result in c.fetchall())
self.assertEqual(sorted(results), sorted(records))
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_hook_test_bulk_dump(self):
from airflow.hooks.mysql_hook import MySqlHook
hook = MySqlHook('airflow_db')
@ -112,6 +121,8 @@ class MySqlTest(unittest.TestCase):
self.skipTest("Skip test_mysql_hook_test_bulk_load "
"since file output is not permitted")
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
@mock.patch('airflow.hooks.mysql_hook.MySqlHook.get_conn')
def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn):
mock_execute = mock.MagicMock()
@ -131,6 +142,8 @@ class MySqlTest(unittest.TestCase):
""".format(tmp_file=tmp_file, table=table)
assertEqualIgnoreMultipleSpaces(self, mock_execute.call_args[0][0], query)
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_mysql(self):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
from airflow.operators.generic_transfer import GenericTransfer
@ -148,6 +161,8 @@ class MySqlTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_overwrite_schema(self):
"""
Verifies option to overwrite connection schema
@ -177,6 +192,16 @@ class PostgresTest(unittest.TestCase):
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
def tearDown(self):
tables_to_drop = ['test_postgres_to_postgres', 'test_airflow']
from airflow.hooks.postgres_hook import PostgresHook
with PostgresHook().get_conn() as conn:
with conn.cursor() as cur:
for t in tables_to_drop:
cur.execute("DROP TABLE IF EXISTS {}".format(t))
@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_postgres_operator_test(self):
sql = """
CREATE TABLE IF NOT EXISTS test_airflow (
@ -197,8 +222,11 @@ class PostgresTest(unittest.TestCase):
end_date=DEFAULT_DATE,
ignore_ti_state=True)
@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_postgres_operator_test_multi(self):
sql = [
"CREATE TABLE IF NOT EXISTS test_airflow (dummy VARCHAR(50))",
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
@ -207,6 +235,8 @@ class PostgresTest(unittest.TestCase):
task_id='postgres_operator_test_multi', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_postgres_to_postgres(self):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
from airflow.operators.generic_transfer import GenericTransfer
@ -224,6 +254,8 @@ class PostgresTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_vacuum(self):
"""
Verifies the VACUUM operation runs well with the PostgresOperator
@ -238,6 +270,8 @@ class PostgresTest(unittest.TestCase):
autocommit=True)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a Postgres test")
def test_overwrite_schema(self):
"""
Verifies option to overwrite connection schema
@ -343,11 +377,15 @@ class TransferTests(unittest.TestCase):
with MySqlHook().get_conn() as cur:
cur.execute("DROP TABLE IF EXISTS baby_names CASCADE;")
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_clear(self):
self.dag.clear(
start_date=DEFAULT_DATE,
end_date=timezone.utcnow())
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_hive(self):
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
sql = "SELECT * FROM baby_names LIMIT 1000;"
@ -361,6 +399,8 @@ class TransferTests(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_hive_partition(self):
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
sql = "SELECT * FROM baby_names LIMIT 1000;"
@ -376,6 +416,8 @@ class TransferTests(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_hive_tblproperties(self):
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
sql = "SELECT * FROM baby_names LIMIT 1000;"
@ -390,6 +432,8 @@ class TransferTests(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
@mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file')
def test_mysql_to_hive_type_conversion(self, mock_load_file):
mysql_table = 'test_mysql_to_hive'
@ -433,6 +477,8 @@ class TransferTests(unittest.TestCase):
with m.get_conn() as c:
c.execute("DROP TABLE IF EXISTS {}".format(mysql_table))
@unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
"This is a MySQL test")
def test_mysql_to_hive_verify_loaded_values(self):
mysql_table = 'test_mysql_to_hive'
hive_table = 'test_mysql_to_hive'

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл