Moving to hadoop tests to the side for now

This commit is contained in:
Maxime Beauchemin 2015-10-18 22:01:55 -07:00
Родитель ae620b9347
Коммит c7cc36ab8d
3 изменённых файлов: 161 добавлений и 182 удалений

Просмотреть файл

@ -19,6 +19,7 @@ jinja2
markdown
mysql-python
nose
nose-exclude
pandas
pygments
pyhive

Просмотреть файл

@ -15,7 +15,8 @@ nosetests \
--cover-package=airflow \
--cover-html-dir=airflow/www/static/coverage \
-v \
--logging-level=DEBUG
--logging-level=DEBUG \
--exclude-test=tests.core.HivePrestoTest
#--with-doctest \
# To run individual tests:
# nosetests tests.core:CoreTest.test_scheduler_job

Просмотреть файл

@ -21,187 +21,6 @@ except ImportError:
import pickle
class TransferTests(unittest.TestCase):
def setUp(self):
configuration.test_mode()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
def test_clear(self):
self.dag.clear(start_date=DEFAULT_DATE, end_date=datetime.now())
def test_mysql_to_hive(self):
sql = "SELECT * FROM task_instance LIMIT 1000;"
t = operators.MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_db',
sql=sql,
hive_table='airflow.test_mysql_to_hive',
recreate=True,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_mysql_to_mysql(self):
sql = "SELECT * FROM task_instance LIMIT 1000;"
t = operators.GenericTransfer(
task_id='test_m2m',
preoperator=[
"DROP TABLE IF EXISTS test_mysql_to_mysql",
"CREATE TABLE IF NOT EXISTS "
"test_mysql_to_mysql LIKE task_instance"
],
source_conn_id='airflow_db',
destination_conn_id='airflow_db',
destination_table="test_mysql_to_mysql",
sql=sql,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_mysql_to_hive_partition(self):
sql = "SELECT * FROM task_instance LIMIT 1000;"
t = operators.MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_db',
sql=sql,
hive_table='airflow.test_mysql_to_hive_part',
partition={'ds': '2015-01-02'},
recreate=False,
create=True,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
class HivePrestoTest(unittest.TestCase):
def setUp(self):
configuration.test_mode()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
self.hql = """
USE airflow;
DROP TABLE IF EXISTS static_babynames_partitioned;
CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
state string,
year string,
name string,
gender string,
num int)
PARTITIONED BY (ds string);
INSERT OVERWRITE TABLE static_babynames_partitioned
PARTITION(ds='{{ ds }}')
SELECT state, year, name, gender, num FROM static_babynames;
"""
def test_hive(self):
t = operators.HiveOperator(
task_id='basic_hql', hql=self.hql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_dryrun(self):
t = operators.HiveOperator(
task_id='basic_hql', hql=self.hql, dag=self.dag)
t.dry_run()
def test_beeline(self):
t = operators.HiveOperator(
task_id='beeline_hql', hive_cli_conn_id='beeline_default',
hql=self.hql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_presto(self):
sql = """
SELECT count(1) FROM airflow.static_babynames_partitioned;
"""
t = operators.PrestoCheckOperator(
task_id='presto_check', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hdfs_sensor(self):
t = operators.HdfsSensor(
task_id='hdfs_sensor_check',
filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_sql_sensor(self):
t = operators.SqlSensor(
task_id='hdfs_sensor_check',
conn_id='presto_default',
sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;",
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_stats(self):
t = operators.HiveStatsCollectionOperator(
task_id='hive_stats_check',
table="airflow.static_babynames_partitioned",
partition={'ds': '2015-01-01'},
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_partition_sensor(self):
t = operators.HivePartitionSensor(
task_id='hive_partition_check',
table='airflow.static_babynames_partitioned',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive2samba(self):
if 'Hive2SambaOperator' in dir(operators):
t = operators.Hive2SambaOperator(
task_id='hive2samba_check',
samba_conn_id='tableau_samba',
hql="SELECT * FROM airflow.static_babynames LIMIT 10000",
destination_filepath='test_airflow.csv',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_to_mysql(self):
t = operators.HiveToMySqlTransfer(
mysql_conn_id='airflow_db',
task_id='hive_to_mysql_check',
create=True,
sql="""
SELECT name
FROM airflow.static_babynames
LIMIT 100
""",
mysql_table='test_static_babynames',
mysql_preoperator=[
'DROP TABLE IF EXISTS test_static_babynames;',
'CREATE TABLE test_static_babynames (name VARCHAR(500))',
],
dag=self.dag)
t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_to_mysql_bulk(self):
t = operators.HiveToMySqlTransfer(
mysql_conn_id='airflow_db',
task_id='hive_to_mysql_bulk_check',
create=True,
sql="""
SELECT name, gender
FROM airflow.static_babynames
LIMIT 100
""",
mysql_table='test_static_babynames',
mysql_preoperator=[
'DROP TABLE IF EXISTS test_static_babynames;',
"""
CREATE TABLE test_static_babynames (
name VARCHAR(500),
gender VARCHAR(500)
)""",
],
bulk_load=True,
dag=self.dag)
t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
class CoreTest(unittest.TestCase):
def setUp(self):
@ -714,5 +533,163 @@ class ConnectionTest(unittest.TestCase):
del os.environ['AIRFLOW_CONN_AIRFLOW_DB']
class TransferTests(unittest.TestCase):
def setUp(self):
configuration.test_mode()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
def test_clear(self):
self.dag.clear(start_date=DEFAULT_DATE, end_date=datetime.now())
def test_mysql_to_hive(self):
sql = "SELECT * FROM task_instance LIMIT 1000;"
t = operators.MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_db',
sql=sql,
hive_table='airflow.test_mysql_to_hive',
recreate=True,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_mysql_to_mysql(self):
sql = "SELECT * FROM task_instance LIMIT 1000;"
t = operators.GenericTransfer(
task_id='test_m2m',
preoperator=[
"DROP TABLE IF EXISTS test_mysql_to_mysql",
"CREATE TABLE IF NOT EXISTS "
"test_mysql_to_mysql LIKE task_instance"
],
source_conn_id='airflow_db',
destination_conn_id='airflow_db',
destination_table="test_mysql_to_mysql",
sql=sql,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_mysql_to_hive_partition(self):
sql = "SELECT * FROM task_instance LIMIT 1000;"
t = operators.MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_db',
sql=sql,
hive_table='airflow.test_mysql_to_hive_part',
partition={'ds': '2015-01-02'},
recreate=False,
create=True,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
class HivePrestoTest(unittest.TestCase):
def setUp(self):
configuration.test_mode()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
self.hql = """
USE airflow;
DROP TABLE IF EXISTS static_babynames_partitioned;
CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
state string,
year string,
name string,
gender string,
num int)
PARTITIONED BY (ds string);
INSERT OVERWRITE TABLE static_babynames_partitioned
PARTITION(ds='{{ ds }}')
SELECT state, year, name, gender, num FROM static_babynames;
"""
def test_hive(self):
t = operators.HiveOperator(
task_id='basic_hql', hql=self.hql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_dryrun(self):
t = operators.HiveOperator(
task_id='basic_hql', hql=self.hql, dag=self.dag)
t.dry_run()
def test_beeline(self):
t = operators.HiveOperator(
task_id='beeline_hql', hive_cli_conn_id='beeline_default',
hql=self.hql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_presto(self):
sql = """
SELECT count(1) FROM airflow.static_babynames_partitioned;
"""
t = operators.PrestoCheckOperator(
task_id='presto_check', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hdfs_sensor(self):
t = operators.HdfsSensor(
task_id='hdfs_sensor_check',
filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_sql_sensor(self):
t = operators.SqlSensor(
task_id='hdfs_sensor_check',
conn_id='presto_default',
sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;",
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_stats(self):
t = operators.HiveStatsCollectionOperator(
task_id='hive_stats_check',
table="airflow.static_babynames_partitioned",
partition={'ds': '2015-01-01'},
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_partition_sensor(self):
t = operators.HivePartitionSensor(
task_id='hive_partition_check',
table='airflow.static_babynames_partitioned',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive2samba(self):
if 'Hive2SambaOperator' in dir(operators):
t = operators.Hive2SambaOperator(
task_id='hive2samba_check',
samba_conn_id='tableau_samba',
hql="SELECT * FROM airflow.static_babynames LIMIT 10000",
destination_filepath='test_airflow.csv',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_hive_to_mysql(self):
t = operators.HiveToMySqlTransfer(
mysql_conn_id='airflow_db',
task_id='hive_to_mysql_check',
create=True,
sql="""
SELECT name
FROM airflow.static_babynames
LIMIT 100
""",
mysql_table='test_static_babynames',
mysql_preoperator=[
'DROP TABLE IF EXISTS test_static_babynames;',
'CREATE TABLE test_static_babynames (name VARCHAR(500))',
],
dag=self.dag)
t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
if __name__ == '__main__':
unittest.main()