Build boilerplate from cookiecutter-python-etl

This commit is contained in:
Ryan Harter 2017-03-15 15:14:05 -04:00
Коммит d6261d4c98
9 изменённых файлов: 227 добавлений и 0 удалений

21
LICENSE Normal file
Просмотреть файл

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017 Ryan Harter
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

5
README.md Normal file
Просмотреть файл

@ -0,0 +1,5 @@
# python_etl
Python ETL jobs for Firefox Telemetry to be scheduled on Airflow.
This project was created with [cookiecutter-python-etl](https://github.com/harterrt/cookiecutter-python-etl).

1
python_etl/__init__.py Normal file
Просмотреть файл

@ -0,0 +1 @@
from .main import *

40
python_etl/main.py Normal file
Просмотреть файл

@ -0,0 +1,40 @@
from datetime import date, timedelta
from pyspark import Row
from moztelemetry import get_pings_properties
from moztelemetry.dataset import Dataset
def get_data(sc):
pings = Dataset.from_source("telemetry") \
.where(docType='main') \
.where(submissionDate=(date.today() - timedelta(1)).strftime("%Y%m%d")) \
.where(appUpdateChannel="nightly") \
.records(sc, sample=0.1)
return get_pings_properties(pings, ["clientId",
"environment/system/os/name"])
def ping_to_row(ping):
return Row(client_id = ping['clientId'],
os = ping['environment/system/os/name'])
def transform_pings(pings):
"""Take a dataframe of main pings and summarize OS share"""
out = pings.map(ping_to_row).distinct()\
.map(lambda x: x.os).countByValue()
return dict(out)
def etl_job(sc, sqlContext):
"""This is the function that will be executed on the cluster"""
results = transform_pings(get_data(sc))
# Display results:
total = sum(map(lambda x: x[1], results.items()))
# Print the OS and client_id counts in descending order:
for pair in sorted(results.items(), key=lambda x: -x[1]):
print "OS: {:<10} Percent: {:0.2f}%"\
.format(pair[0], float(pair[1])/total*100)

10
scheduling/airflow.py Normal file
Просмотреть файл

@ -0,0 +1,10 @@
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from python_etl import main
if __name__ == "__main__":
conf = SparkConf().setAppName('python_etl')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
main.etl_job(sc, sqlContext)

10
scheduling/airflow.sh Normal file
Просмотреть файл

@ -0,0 +1,10 @@
#!/bin/bash
# We use jupyter by default, but here we want to use python
unset PYSPARK_DRIVER_PYTHON
# Clone, install, and run
git clone https://github.com/mozilla/python_etl.git python_etl
cd python_etl
pip install .
spark-submit scheduling/airflow.py

Просмотреть файл

@ -0,0 +1,75 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"repo_dir = \"tmp\"\n",
"repo_https_url = \"https://github.com/mozilla/python_etl.git\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"!git clone $repo_https_url $repo_dir && cd $repo_dir && python setup.py bdist_egg"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import os\n",
"distpath = repo_dir + '/dist'\n",
"sc.addPyFile(os.path.join(distpath, os.listdir(distpath)[0]))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"import python_etl\n",
"python_etl.etl_job(sc, sqlContext)"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [conda root]",
"language": "python",
"name": "conda-root-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 1
}

13
setup.py Normal file
Просмотреть файл

@ -0,0 +1,13 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
from distutils.core import setup
setup(name='python_etl',
version='0.1',
description='Python ETL jobs for Firefox Telemetry to be scheduled on Airflow.',
author='Ryan Harter',
author_email='harterrt@mozilla.com',
url='https://github.com/mozilla/python_etl.git',
packages=find_packages(exclude=['tests']),
)

52
tests/test_main.py Normal file
Просмотреть файл

@ -0,0 +1,52 @@
import pytest
from pyspark import SparkConf, SparkContext
from python_etl import *
# Initialize a spark context:
@pytest.fixture(scope="session")
def spark_context(request):
conf = SparkConf().setMaster("local")\
.setAppName("python_etl" + "_test")
sc = SparkContext(conf=conf)
# teardown
request.addfinalizer(lambda: sc.stop())
return sc
# Generate some data
def create_row(client_id, os):
return {'clientId': client_id, 'environment/system/os/name': os}
def simple_data():
raw_data = [('a', 'windows'),
('b', 'darwin'),
('c', 'linux'),
('d', 'windows')]
return map(lambda raw: create_row(*raw), raw_data)
def duplicate_data():
return simple_data() + simple_data()
@pytest.fixture
def simple_rdd(spark_context):
return spark_context.parallelize(simple_data())
@pytest.fixture
def duplicate_rdd(spark_context):
return spark_context.parallelize(duplicate_data())
# Tests
def test_simple_transform(simple_rdd):
actual = transform_pings(simple_rdd)
expected = {'windows':2, 'darwin': 1, 'linux':1}
assert actual == expected
def test_duplicate_transform(duplicate_rdd):
actual = transform_pings(duplicate_rdd)
expected = {'windows':2, 'darwin': 1, 'linux':1}
assert actual == expected