- move Spark test fixture into pytest conftest.py module
- rename basic_etl to just basic
This commit is contained in:
Jannis Leidel 2017-04-25 16:27:38 +02:00 коммит произвёл Ryan Harter
Родитель 3ab6a9890b
Коммит bad1e83ca9
21 изменённых файлов: 44 добавлений и 51 удалений

Просмотреть файл

@ -8,15 +8,15 @@ addons:
apt:
packages:
- libsnappy-dev
before_install:
before_install:
- export PATH=$HOME/.local/bin:$PATH
install:
install:
- pip install tox-travis codecov
- "[ -f spark ] || mkdir spark && cd spark && wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz && cd .."
- tar -xf ./spark/spark-2.0.0-bin-hadoop2.7.tgz
- export SPARK_HOME=`pwd`/spark-2.0.0-bin-hadoop2.7
- export PYTHONPATH=${SPARK_HOME}/python/:$(echo ${SPARK_HOME}/python/lib/py4j-*-src.zip):${PYTHONPATH}
script:
script:
- tox
after_success:
- codecov

1
mozetl/__init__.py Normal file
Просмотреть файл

@ -0,0 +1 @@
from .main import * # noqa

1
mozetl/basic/__init__.py Normal file
Просмотреть файл

@ -0,0 +1 @@
from .transform import * # noqa

Просмотреть файл

Просмотреть файл

Просмотреть файл

Просмотреть файл

@ -1,7 +1,7 @@
from pyspark.sql.types import StringType, LongType
from ..basic import convert_pings, DataFrameConfig
from .utils import testpilot_etl_boilerplate
from ..basic_etl import convert_pings, DataFrameConfig
def transform_pings(sqlContext, pings):

Просмотреть файл

@ -4,7 +4,7 @@ from pyspark.sql.types import (LongType, DoubleType, BooleanType, StringType,
StructField)
from pyspark.sql import Row
from ..basic_etl import DataFrameConfig, convert_pings
from ..basic import DataFrameConfig, convert_pings
from .utils import testpilot_etl_boilerplate

Просмотреть файл

Просмотреть файл

@ -1 +0,0 @@
from .main import *

Просмотреть файл

@ -1 +0,0 @@
from .transform import *

Просмотреть файл

@ -1,10 +1,10 @@
import mozetl
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from python_etl import main
if __name__ == "__main__":
conf = SparkConf().setAppName('python_etl')
conf = SparkConf().setAppName('python_mozetl')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
main.etl_job(sc, sqlContext)
mozetl.etl_job(sc, sqlContext)

Просмотреть файл

@ -4,7 +4,5 @@
unset PYSPARK_DRIVER_PYTHON
# Clone, install, and run
git clone https://github.com/mozilla/python_etl.git python_etl
cd python_etl
pip install .
pip install git+https://github.com/mozilla/python_mozetl.git
spark-submit scheduling/airflow.py

Просмотреть файл

@ -9,7 +9,7 @@
"outputs": [],
"source": [
"repo_dir = \"tmp\"\n",
"repo_https_url = \"https://github.com/mozilla/python_etl.git\""
"repo_https_url = \"https://github.com/mozilla/python_mozetl.git\""
]
},
{
@ -45,8 +45,8 @@
},
"outputs": [],
"source": [
"import python_etl\n",
"python_etl.etl_job(sc, sqlContext)"
"import mozetl\n",
"mozetl.etl_job(sc, sqlContext)"
]
}
],

Просмотреть файл

@ -1,7 +1,7 @@
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from python_etl.testpilot import pulse
from mozetl.testpilot import pulse
if __name__ == "__main__":
conf = SparkConf().setAppName('pulse_etl')

Просмотреть файл

@ -1,21 +1,15 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
from distutils.core import setup
setup(name='python_etl',
version='0.1',
description='Python ETL jobs for Firefox Telemetry to be scheduled on Airflow.',
author='Ryan Harter',
author_email='harterrt@mozilla.com',
url='https://github.com/mozilla/python_etl.git',
packages=find_packages(exclude=['tests']),
install_requires=[
'python_moztelemetry',
],
test_requires=[
'pytest',
'coverage',
'pytest-cov'
]
)
setup(
name='mozetl',
version='0.1',
description='Python ETL jobs for Firefox Telemetry to be scheduled on Airflow.',
author='Ryan Harter',
author_email='harterrt@mozilla.com',
url='https://github.com/mozilla/python_mozetl.git',
packages=find_packages(exclude=['tests']),
install_requires=[
'python_moztelemetry', # TODO: pin version
],
)

Просмотреть файл

@ -7,7 +7,7 @@ def spark_context(request):
"""Initialize a spark context"""
spark = (SparkSession
.builder
.appName("python_etl_test")
.appName("python_mozetl_test")
.getOrCreate())
sc = spark.sparkContext
@ -18,6 +18,9 @@ def spark_context(request):
return sc
def row_to_dict(row):
@pytest.fixture(scope="session")
def row_to_dict():
"""Convert pyspark.Row to dict for easier unordered comparison"""
return {key: row[key] for key in row.__fields__}
def func(row):
return {key: row[key] for key in row.__fields__}
return func

Просмотреть файл

@ -1,6 +1,5 @@
from pyspark.sql import SQLContext
from python_etl.testpilot.containers import transform_pings
from utils import row_to_dict, spark_context
from mozetl.testpilot.containers import transform_pings
def create_ping_rdd(sc, payload):
@ -24,7 +23,7 @@ def create_row(overrides):
return {key: overrides.get(key, None) for key in keys}
def test_open_container_ping(spark_context):
def test_open_container_ping(row_to_dict, spark_context):
input_payload = {
'uuid': 'a',
'userContextId': 10,
@ -44,7 +43,7 @@ def test_open_container_ping(spark_context):
assert row_to_dict(actual) == create_row(result_payload)
def test_edit_container_ping(spark_context):
def test_edit_container_ping(row_to_dict, spark_context):
input_payload = {
'uuid': 'b',
'event': 'edit-containers'
@ -58,7 +57,7 @@ def test_edit_container_ping(spark_context):
assert row_to_dict(actual) == create_row(input_payload)
def test_hide_container_ping(spark_context):
def test_hide_container_ping(row_to_dict, spark_context):
input_payload = {
'uuid': 'a',
'userContextId': 'firefox-default',

Просмотреть файл

@ -1,7 +1,6 @@
import pytest
from pyspark.sql import SparkSession
from python_etl import *
from utils import *
from mozetl import transform_pings
# Generate some data
def create_row(client_id, os):

Просмотреть файл

@ -1,8 +1,8 @@
import pytest
import json
from pyspark.sql import SQLContext, Row
from python_etl.testpilot.pulse import transform_pings
from utils import spark_context, row_to_dict
from mozetl.testpilot.pulse import transform_pings
def create_row():
@ -15,7 +15,7 @@ def simple_rdd(spark_context):
return spark_context.parallelize([create_row()])
def test_simple_transform(simple_rdd, spark_context):
def test_simple_transform(row_to_dict, simple_rdd, spark_context):
actual = transform_pings(SQLContext(spark_context), simple_rdd).take(1)[0]
empty_request_keys = ["sub_frame", "stylesheet", "script", "image",

Просмотреть файл

@ -7,13 +7,13 @@
envlist = py27
[pytest]
addopts = --cov=python_etl tests/
addopts = --cov=mozetl tests/
[testenv]
sitepackages = True
commands = pytest
passenv = PYTHONPATH
setenv =
PYTHONPATH = {env:PYTHONPATH}
SPARK_HOME = {env:SPARK_HOME}
deps =
pytest