This commit is contained in:
Ben Wu 2020-12-16 11:51:45 -05:00 коммит произвёл GitHub
Родитель 5756722cd5
Коммит e42d0a9d23
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
22 изменённых файлов: 203 добавлений и 163 удалений

Просмотреть файл

@ -16,7 +16,7 @@ install_dependencies: &install_dependencies
command: | command: |
apt update apt update
apt install -y libsnappy-dev openjdk-8-jre-headless apt install -y libsnappy-dev openjdk-8-jre-headless
pip install tox coverage pip install tox coverage==5.3
save_cache_settings: &save_cache_settings save_cache_settings: &save_cache_settings
key: v1-python_mozetl-{{ checksum "setup.py" }} key: v1-python_mozetl-{{ checksum "setup.py" }}
@ -66,20 +66,15 @@ test_settings: &test_settings
version: 2 version: 2
jobs: jobs:
py27: py37:
<<: *test_settings <<: *test_settings
parallelism: 4 parallelism: 4
docker: docker:
- image: python:2.7-stretch - image: python:3.7-stretch
py35:
<<: *test_settings
parallelism: 4
docker:
- image: python:3.5-stretch
lint: lint:
docker: docker:
- image: python:3.6-stretch - image: python:3.7-stretch
working_directory: ~/python_mozetl working_directory: ~/python_mozetl
steps: steps:
- checkout - checkout
@ -93,7 +88,7 @@ jobs:
docs: docs:
docker: docker:
- image: python:2.7-stretch - image: python:3.7-stretch
working_directory: ~/python_mozetl working_directory: ~/python_mozetl
steps: steps:
- checkout - checkout
@ -137,8 +132,7 @@ workflows:
version: 2 version: 2
build: build:
jobs: jobs:
- py27 - py37
- py35
- lint - lint
- docs - docs
- docs-deploy: - docs-deploy:

Просмотреть файл

@ -81,7 +81,7 @@ def extract_search_counts(frame):
extracted = grouped.select( extracted = grouped.select(
"did", "did",
F.col("sum(search_count_atom)").alias("search_count_all"), F.col("sum(search_count_atom)").alias("search_count_all"),
*[F.col("sum({})".format(c)).alias(c) for c in SEARCH_ACCESS_COLUMNS] *[F.col("sum({})".format(c)).alias(c) for c in SEARCH_ACCESS_COLUMNS],
) )
# Create a homologous output row for each input row # Create a homologous output row for each input row
# where search_counts is NULL. # where search_counts is NULL.
@ -91,7 +91,7 @@ def extract_search_counts(frame):
.select( .select(
"did", "did",
F.lit(0).alias("search_count_all"), F.lit(0).alias("search_count_all"),
*[F.lit(0).alias(c) for c in SEARCH_ACCESS_COLUMNS] *[F.lit(0).alias(c) for c in SEARCH_ACCESS_COLUMNS],
) )
) )
intermediate = extracted.unionAll(nulls) intermediate = extracted.unionAll(nulls)

Просмотреть файл

@ -155,8 +155,8 @@ class Prof(object):
# Helpers. # Helpers.
def fix_vendor(vendor_id): def fix_vendor(vendor_id):
if vendor_id == u"Intel Open Source Technology Center": if vendor_id == "Intel Open Source Technology Center":
return u"0x8086" return "0x8086"
return vendor_id return vendor_id
@ -427,7 +427,7 @@ class Trend(TrendBase):
text = json.dumps(self.cache) text = json.dumps(self.cache)
print("Writing file {0}".format(self.local_path, text)) print("Writing file {0}".format(self.local_path))
with open(self.local_path, "w") as fp: with open(self.local_path, "w") as fp:
fp.write(text) fp.write(text)
@ -621,9 +621,9 @@ if __name__ == "__main__":
WinArchTrend(), WinArchTrend(),
WindowsVendorTrend(), WindowsVendorTrend(),
WindowsVistaPlusGroup([Direct2DTrend(), Direct3D11Trend()]), WindowsVistaPlusGroup([Direct2DTrend(), Direct3D11Trend()]),
DeviceGenTrend(u"0x8086", "intel"), DeviceGenTrend("0x8086", "intel"),
DeviceGenTrend(u"0x10de", "nvidia"), DeviceGenTrend("0x10de", "nvidia"),
DeviceGenTrend(u"0x1002", "amd"), DeviceGenTrend("0x1002", "amd"),
] ]
), ),
] ]

Просмотреть файл

@ -372,7 +372,7 @@ def search_aggregates_etl(submission_date, bucket, prefix, **kwargs):
prefix, prefix,
SEARCH_AGGREGATES_VERSION, SEARCH_AGGREGATES_VERSION,
search_aggregates, search_aggregates,
**kwargs **kwargs,
) )
@ -384,7 +384,7 @@ def search_clients_daily_etl(submission_date, bucket, prefix, **kwargs):
SEARCH_CLIENTS_DAILY_VERSION, SEARCH_CLIENTS_DAILY_VERSION,
search_clients_daily, search_clients_daily,
orderBy=["sample_id"], orderBy=["sample_id"],
**kwargs **kwargs,
) )

Просмотреть файл

@ -24,12 +24,12 @@ spark = SparkSession.builder.appName("modules-with-missing-symbols").getOrCreate
sc.addPyFile("stemming-1.0.1/stemming/porter2.py") sc.addPyFile("stemming-1.0.1/stemming/porter2.py")
from crashcorrelations import ( from crashcorrelations import ( # noqa E402
utils, utils,
download_data, download_data,
crash_deviations, crash_deviations,
comments, comments,
) # noqa E402 )
# workaround airflow not able to different schedules for tasks in a dag # workaround airflow not able to different schedules for tasks in a dag

Просмотреть файл

@ -78,7 +78,7 @@ def get_df(spark, date_from):
def get_addons_per_client(users_df, minimum_addons_count): def get_addons_per_client(users_df, minimum_addons_count):
""" Extracts a DataFrame that contains one row """Extracts a DataFrame that contains one row
for each client along with the list of active add-on GUIDs. for each client along with the list of active add-on GUIDs.
""" """
@ -328,8 +328,7 @@ def today_minus_7_days():
def verify_valid_coefs(coefs): def verify_valid_coefs(coefs):
""" verify that the model has proper floating point values (> 0) """verify that the model has proper floating point values (> 0)"""
"""
assert "ensemble_weights" in coefs assert "ensemble_weights" in coefs
weights = coefs["ensemble_weights"] weights = coefs["ensemble_weights"]
@ -362,9 +361,9 @@ def verify_valid_coefs(coefs):
class CostLLR: class CostLLR:
""" based on Niko Brummer's original implementation: """based on Niko Brummer's original implementation:
Niko Brummer and Johan du Preez, Application-Independent Evaluation of Speaker Detection" Niko Brummer and Johan du Preez, Application-Independent Evaluation of Speaker Detection"
Computer Speech and Language, 2005 Computer Speech and Language, 2005
""" """
def __init__(self): def __init__(self):
@ -417,8 +416,8 @@ class CostLLR:
def cross_validation_split(dataset, k_folds): def cross_validation_split(dataset, k_folds):
""" """
Splits dataframe into k_folds, returning array of dataframes Splits dataframe into k_folds, returning array of dataframes
""" """
dataset_split = [] dataset_split = []
h = 1.0 / k_folds h = 1.0 / k_folds
df = dataset.select("*", rand().alias("rand")) df = dataset.select("*", rand().alias("rand"))

Просмотреть файл

@ -27,7 +27,7 @@ ONE_WEEK_AGO = (dt.datetime.now() - dt.timedelta(days=7)).strftime("%Y%m%d")
def is_valid_addon(broadcast_amo_whitelist, guid, addon): def is_valid_addon(broadcast_amo_whitelist, guid, addon):
""" Filter individual addons out to exclude, system addons, """Filter individual addons out to exclude, system addons,
legacy addons, disabled addons, sideloaded addons. legacy addons, disabled addons, sideloaded addons.
""" """
return not ( return not (
@ -47,7 +47,7 @@ def is_valid_addon(broadcast_amo_whitelist, guid, addon):
def get_addons_per_client(broadcast_amo_whitelist, users_df): def get_addons_per_client(broadcast_amo_whitelist, users_df):
""" Extracts a DataFrame that contains one row """Extracts a DataFrame that contains one row
for each client along with the list of active add-on GUIDs. for each client along with the list of active add-on GUIDs.
""" """
@ -71,7 +71,7 @@ def get_addons_per_client(broadcast_amo_whitelist, users_df):
def get_initial_sample(spark): def get_initial_sample(spark):
""" Takes an initial sample from the longitudinal dataset """Takes an initial sample from the longitudinal dataset
(randomly sampled from main summary). Coarse filtering on: (randomly sampled from main summary). Coarse filtering on:
- number of installed addons (greater than 1) - number of installed addons (greater than 1)
- corrupt and generally wierd telemetry entries - corrupt and generally wierd telemetry entries
@ -92,8 +92,7 @@ def get_initial_sample(spark):
def extract_telemetry(spark): def extract_telemetry(spark):
""" load some training data from telemetry given a sparkContext """load some training data from telemetry given a sparkContext"""
"""
sc = spark.sparkContext sc = spark.sparkContext
# Define the set of feature names to be used in the donor computations. # Define the set of feature names to be used in the donor computations.

Просмотреть файл

@ -18,8 +18,7 @@ OUTPUT_BASE_FILENAME = "guid_install_ranking"
def extract_telemetry(sparkSession): def extract_telemetry(sparkSession):
""" Load some training data from telemetry given a sparkContext """Load some training data from telemetry given a sparkContext"""
"""
frame = sparkSession.sql( frame = sparkSession.sql(
""" """
SELECT SELECT
@ -42,7 +41,7 @@ def extract_telemetry(sparkSession):
def transform(frame): def transform(frame):
""" Convert the dataframe to JSON and augment each record to """Convert the dataframe to JSON and augment each record to
include the install count for each addon. include the install count for each addon.
""" """

Просмотреть файл

@ -86,7 +86,7 @@ def get_samples(spark, date_from):
def get_addons_per_client(users_df, addon_whitelist, minimum_addons_count): def get_addons_per_client(users_df, addon_whitelist, minimum_addons_count):
""" Extracts a DataFrame that contains one row """Extracts a DataFrame that contains one row
for each client along with the list of active add-on GUIDs. for each client along with the list of active add-on GUIDs.
""" """
@ -125,8 +125,7 @@ def get_addons_per_client(users_df, addon_whitelist, minimum_addons_count):
def compute_clusters(addons_df, num_clusters, random_seed): def compute_clusters(addons_df, num_clusters, random_seed):
""" Performs user clustering by using add-on ids as features. """Performs user clustering by using add-on ids as features."""
"""
# Build the stages of the pipeline. We need hashing to make the next # Build the stages of the pipeline. We need hashing to make the next
# steps work. # steps work.
@ -145,8 +144,7 @@ def compute_clusters(addons_df, num_clusters, random_seed):
def get_donor_pools(users_df, clusters_df, num_donors, random_seed=None): def get_donor_pools(users_df, clusters_df, num_donors, random_seed=None):
""" Samples users from each cluster. """Samples users from each cluster."""
"""
cluster_population = clusters_df.groupBy("prediction").count().collect() cluster_population = clusters_df.groupBy("prediction").count().collect()
clusters_histogram = [(x["prediction"], x["count"]) for x in cluster_population] clusters_histogram = [(x["prediction"], x["count"]) for x in cluster_population]
@ -216,7 +214,7 @@ def format_donors_dictionary(donors_df):
def similarity_function(x, y): def similarity_function(x, y):
""" Similarity function for comparing user features. """Similarity function for comparing user features.
This actually really should be implemented in taar.similarity_recommender This actually really should be implemented in taar.similarity_recommender
and then imported here for consistency. and then imported here for consistency.
@ -260,7 +258,7 @@ def generate_non_cartesian_pairs(first_rdd, second_rdd):
def get_lr_curves( def get_lr_curves(
spark, features_df, cluster_ids, kernel_bandwidth, num_pdf_points, random_seed=None spark, features_df, cluster_ids, kernel_bandwidth, num_pdf_points, random_seed=None
): ):
""" Compute the likelihood ratio curves for clustered clients. """Compute the likelihood ratio curves for clustered clients.
Work-flow followed in this function is as follows: Work-flow followed in this function is as follows:

Просмотреть файл

@ -88,7 +88,7 @@ def store_json_to_s3(json_data, base_filename, date, prefix, bucket):
def load_amo_external_whitelist(): def load_amo_external_whitelist():
""" Download and parse the AMO add-on whitelist. """Download and parse the AMO add-on whitelist.
:raises RuntimeError: the AMO whitelist file cannot be downloaded or contains :raises RuntimeError: the AMO whitelist file cannot be downloaded or contains
no valid add-ons. no valid add-ons.
@ -133,8 +133,8 @@ def load_amo_curated_whitelist():
def hash_telemetry_id(telemetry_id): def hash_telemetry_id(telemetry_id):
""" """
This hashing function is a reference implementation based on : This hashing function is a reference implementation based on :
https://phabricator.services.mozilla.com/D8311 https://phabricator.services.mozilla.com/D8311
""" """
return hashlib.sha256(telemetry_id.encode("utf8")).hexdigest() return hashlib.sha256(telemetry_id.encode("utf8")).hexdigest()

Просмотреть файл

@ -44,7 +44,7 @@ def generate_filter_parameters(end_date, days_back):
def write_csv(dataframe, path, header=True): def write_csv(dataframe, path, header=True):
""" Write a dataframe to local disk. """Write a dataframe to local disk.
Disclaimer: Do not write csv files larger than driver memory. This Disclaimer: Do not write csv files larger than driver memory. This
is ~15GB for ec2 c3.xlarge (due to caching overhead). is ~15GB for ec2 c3.xlarge (due to caching overhead).

Просмотреть файл

@ -2,13 +2,13 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
test_deps = [ test_deps = [
'coverage==4.5.2', 'coverage==5.3',
'pytest-cov==2.6.0', 'pytest-cov==2.6.0',
'pytest-timeout==1.3.3', 'pytest-timeout==1.3.3',
'moto==1.3.16', 'moto==1.3.16',
'mock==2.0.0', 'mock==2.0.0',
'pytest==3.10.1', 'pytest==3.10.1',
'flake8==3.6.0' 'flake8==3.8.4'
] ]
extras = { extras = {
@ -30,17 +30,17 @@ setup(
'boto==2.49.0', 'boto==2.49.0',
'boto3==1.16.20', 'boto3==1.16.20',
'botocore==1.19.20', 'botocore==1.19.20',
'click==6.7', 'click==7.1.2',
'click_datetime==0.2', 'click_datetime==0.2',
'numpy==1.13.3', 'numpy==1.19.4',
'pandas==0.23.4', 'pandas==1.1.4',
'pyspark==2.3.2', 'pyspark==2.3.2',
'python_moztelemetry==0.10.2', 'python_moztelemetry==0.10.2',
'requests-toolbelt==0.8.0', 'requests-toolbelt==0.9.1',
'requests==2.20.1', 'requests==2.25.0',
'scipy==1.0.0rc1', 'scipy==1.5.4',
'typing==3.6.4', 'typing==3.6.4',
'six==1.11.0', 'six==1.15.0',
], ],
tests_require=test_deps, tests_require=test_deps,
extras_require=extras, extras_require=extras,

Просмотреть файл

@ -66,44 +66,44 @@ def test_profile_creation_date_fields(clients_daily):
# the TZ setting of the system on which the tests run. # the TZ setting of the system on which the tests run.
expected_back = set( expected_back = set(
[ [
u"2014-12-16", "2014-12-16",
u"2016-09-07", "2016-09-07",
u"2016-05-12", "2016-05-12",
u"2017-02-16", "2017-02-16",
u"2012-11-17", "2012-11-17",
u"2013-09-08", "2013-09-08",
u"2017-02-12", "2017-02-12",
u"2016-04-04", "2016-04-04",
u"2017-04-25", "2017-04-25",
u"2015-06-17", "2015-06-17",
] ]
) )
expected_utc = set( expected_utc = set(
[ [
u"2014-12-17", "2014-12-17",
u"2016-09-08", "2016-09-08",
u"2016-05-13", "2016-05-13",
u"2017-02-17", "2017-02-17",
u"2012-11-18", "2012-11-18",
u"2013-09-09", "2013-09-09",
u"2017-02-13", "2017-02-13",
u"2016-04-05", "2016-04-05",
u"2017-04-26", "2017-04-26",
u"2015-06-18", "2015-06-18",
] ]
) )
expected_forward = set( expected_forward = set(
[ [
u"2014-12-18", "2014-12-18",
u"2016-09-09", "2016-09-09",
u"2016-05-14", "2016-05-14",
u"2017-02-18", "2017-02-18",
u"2012-11-19", "2012-11-19",
u"2013-09-10", "2013-09-10",
u"2017-02-14", "2017-02-14",
u"2016-04-06", "2016-04-06",
u"2017-04-27", "2017-04-27",
u"2015-06-19", "2015-06-19",
] ]
) )
ten_pcds = clients_daily.select("profile_creation_date").take(10) ten_pcds = clients_daily.select("profile_creation_date").take(10)

Просмотреть файл

@ -15,19 +15,19 @@ def sample_document():
# string before passing through to the sampler code. # string before passing through to the sampler code.
"content": {"payload": {"foo": "bar"}}, "content": {"payload": {"foo": "bar"}},
"meta": { "meta": {
u"Content-Length": u"7094", "Content-Length": "7094",
u"Date": u"Sun, 19 Aug 2018 15:08:00 GMT", "Date": "Sun, 19 Aug 2018 15:08:00 GMT",
u"Host": u"incoming.telemetry.mozilla.org", "Host": "incoming.telemetry.mozilla.org",
"Hostname": u"ip-1.1.1.1", "Hostname": "ip-1.1.1.1",
"Timestamp": 1534691279765301222, "Timestamp": 1534691279765301222,
"Type": u"telemetry-raw", "Type": "telemetry-raw",
u"User-Agent": u"pingsender/1.0", "User-Agent": "pingsender/1.0",
u"X-Forwarded-For": u"127.0.0.1", "X-Forwarded-For": "127.0.0.1",
u"X-PingSender-Version": u"1.0", "X-PingSender-Version": "1.0",
u"args": u"v=4", "args": "v=4",
u"protocol": u"HTTP/1.1", "protocol": "HTTP/1.1",
u"remote_addr": u"1.1.1.1", "remote_addr": "1.1.1.1",
u"uri": u"/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231", "uri": "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231",
}, },
} }

Просмотреть файл

@ -10,7 +10,7 @@ from pyspark.sql.types import ArrayType, LongType, StringType, StructField, Stru
@pytest.fixture() @pytest.fixture()
def sync_summary_schema(): def sync_summary_schema():
""""Generate a schema for sync_summary. This subset contains enough """Generate a schema for sync_summary. This subset contains enough
structure for testing bookmark validation. The schema is derived from structure for testing bookmark validation. The schema is derived from
[`telemetry-batch-view`][1]. [`telemetry-batch-view`][1].

Просмотреть файл

@ -241,10 +241,16 @@ EXPECTED_FINAL_JDATA = {
@pytest.yield_fixture(scope="function") @pytest.yield_fixture(scope="function")
def s3_fixture(): def s3_fixture():
mock_s3().start() s3 = mock_s3()
s3.start()
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=taar_amowhitelist.AMO_DUMP_BUCKET) conn.create_bucket(
Bucket=taar_amowhitelist.AMO_DUMP_BUCKET,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
taar_utils.store_json_to_s3( taar_utils.store_json_to_s3(
json.dumps(SAMPLE_DATA), json.dumps(SAMPLE_DATA),
taar_amowhitelist.AMO_DUMP_BASE_FILENAME, taar_amowhitelist.AMO_DUMP_BASE_FILENAME,
@ -253,7 +259,7 @@ def s3_fixture():
taar_amowhitelist.AMO_DUMP_BUCKET, taar_amowhitelist.AMO_DUMP_BUCKET,
) )
yield conn, SAMPLE_DATA yield conn, SAMPLE_DATA
mock_s3().stop() s3.stop()
def test_extract(s3_fixture): def test_extract(s3_fixture):

Просмотреть файл

@ -108,44 +108,35 @@ MOCK_KEYED_ADDONS = [
EXPECTED_GUID_GUID_DATA = [ EXPECTED_GUID_GUID_DATA = [
Row( Row(
key_addon=u"test-guid-2", key_addon="test-guid-2",
coinstallation_counts=[ coinstallation_counts=[
Row(id=u"test-guid-6", n=1), Row(id="test-guid-6", n=1),
Row(id=u"test-guid-5", n=1), Row(id="test-guid-5", n=1),
Row(id=u"test-guid-3", n=1), Row(id="test-guid-3", n=1),
Row(id=u"test-guid-1", n=1), Row(id="test-guid-1", n=1),
], ],
), ),
Row(key_addon=u"test-guid-4", coinstallation_counts=[Row(id=u"test-guid-1", n=1)]), Row(key_addon="test-guid-4", coinstallation_counts=[Row(id="test-guid-1", n=1)]),
Row( Row(
key_addon=u"test-guid-3", key_addon="test-guid-3",
coinstallation_counts=[Row(id="test-guid-2", n=1), Row(id="test-guid-1", n=2)],
),
Row(
key_addon="test-guid-5",
coinstallation_counts=[Row(id="test-guid-6", n=1), Row(id="test-guid-2", n=1)],
),
Row(
key_addon="test-guid-1",
coinstallation_counts=[ coinstallation_counts=[
Row(id=u"test-guid-2", n=1), Row(id="test-guid-2", n=1),
Row(id=u"test-guid-1", n=2), Row(id="test-guid-1", n=2),
Row(id="test-guid-3", n=2),
Row(id="test-guid-4", n=1),
], ],
), ),
Row( Row(
key_addon=u"test-guid-5", key_addon="test-guid-6",
coinstallation_counts=[ coinstallation_counts=[Row(id="test-guid-2", n=1), Row(id="test-guid-5", n=1)],
Row(id=u"test-guid-6", n=1),
Row(id=u"test-guid-2", n=1),
],
),
Row(
key_addon=u"test-guid-1",
coinstallation_counts=[
Row(id=u"test-guid-2", n=1),
Row(id=u"test-guid-1", n=2),
Row(id=u"test-guid-3", n=2),
Row(id=u"test-guid-4", n=1),
],
),
Row(
key_addon=u"test-guid-6",
coinstallation_counts=[
Row(id=u"test-guid-2", n=1),
Row(id=u"test-guid-5", n=1),
],
), ),
] ]
@ -236,7 +227,12 @@ def test_load_s3(spark):
# Create the bucket before we upload # Create the bucket before we upload
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
bucket_obj = conn.create_bucket(Bucket=BUCKET) bucket_obj = conn.create_bucket(
Bucket=BUCKET,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
load_df = spark.createDataFrame(EXPECTED_GUID_GUID_DATA) load_df = spark.createDataFrame(EXPECTED_GUID_GUID_DATA)
taar_lite_guidguid.load_s3(load_df, "20180301", PREFIX, BUCKET) taar_lite_guidguid.load_s3(load_df, "20180301", PREFIX, BUCKET)

Просмотреть файл

@ -55,10 +55,10 @@ MOCK_TELEMETRY_SAMPLE = [
] ]
EXPECTED_ADDON_INSTALLATIONS = { EXPECTED_ADDON_INSTALLATIONS = {
u"test-guid-1": 100, "test-guid-1": 100,
u"test-guid-2": 200, "test-guid-2": 200,
u"test-guid-3": 300, "test-guid-3": 300,
u"test-guid-4": 400, "test-guid-4": 400,
} }
@ -74,12 +74,12 @@ def test_extract_phase(spark):
output = dict(extract_df.rdd.map(lambda_func).collect()) output = dict(extract_df.rdd.map(lambda_func).collect())
EXPECTED = { EXPECTED = {
u"test-guid-1": 1, "test-guid-1": 1,
u"test-guid-2": 3, "test-guid-2": 3,
u"test-guid-3": 3, "test-guid-3": 3,
u"test-guid-4": 2, "test-guid-4": 2,
u"test-guid-5": 2, "test-guid-5": 2,
u"test-guid-6": 1, "test-guid-6": 1,
} }
assert EXPECTED == output assert EXPECTED == output
@ -106,7 +106,12 @@ def test_load_s3(spark):
# Create the bucket before we upload # Create the bucket before we upload
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
bucket_obj = conn.create_bucket(Bucket=BUCKET) bucket_obj = conn.create_bucket(
Bucket=BUCKET,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
rdd = spark.createDataFrame(MOCK_TELEMETRY_SAMPLE) rdd = spark.createDataFrame(MOCK_TELEMETRY_SAMPLE)
result_json = taar_lite_guidranking.transform(rdd) result_json = taar_lite_guidranking.transform(rdd)

Просмотреть файл

@ -85,7 +85,12 @@ def test_load(mock_transformed_data):
date = "20190105" date = "20190105"
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=bucket) conn.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
taar_update_whitelist.load_etl(mock_transformed_data, date, prefix, bucket) taar_update_whitelist.load_etl(mock_transformed_data, date, prefix, bucket)

Просмотреть файл

@ -50,7 +50,12 @@ def test_read_from_s3():
s3_json_fname = "test.json" s3_json_fname = "test.json"
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=bucket) conn.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
with NamedTemporaryFile("w") as json_file: with NamedTemporaryFile("w") as json_file:
json.dump(SAMPLE_DATA, json_file) json.dump(SAMPLE_DATA, json_file)
@ -71,7 +76,12 @@ def test_write_to_s3():
dest_filename = "test.json" dest_filename = "test.json"
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
bucket_obj = conn.create_bucket(Bucket=bucket) bucket_obj = conn.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
with NamedTemporaryFile("w") as json_file: with NamedTemporaryFile("w") as json_file:
json.dump(SAMPLE_DATA, json_file) json.dump(SAMPLE_DATA, json_file)
@ -102,7 +112,12 @@ def test_write_json_s3():
content = {"it-IT": ["firefox@getpocket.com"]} content = {"it-IT": ["firefox@getpocket.com"]}
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
bucket_obj = conn.create_bucket(Bucket=bucket) bucket_obj = conn.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
# Store the data in the mocked bucket. # Store the data in the mocked bucket.
taar_utils.store_json_to_s3( taar_utils.store_json_to_s3(
@ -123,7 +138,12 @@ def test_write_json_s3():
@mock_s3 @mock_s3
def test_load_amo_external_whitelist(): def test_load_amo_external_whitelist():
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=taar_utils.AMO_DUMP_BUCKET) conn.create_bucket(
Bucket=taar_utils.AMO_DUMP_BUCKET,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
# Make sure that whitelist loading fails before mocking the S3 file. # Make sure that whitelist loading fails before mocking the S3 file.
EXCEPTION_MSG = "Empty AMO whitelist detected" EXCEPTION_MSG = "Empty AMO whitelist detected"

Просмотреть файл

@ -31,7 +31,9 @@ def test_write_csv_ascii(generate_data, tmpdir):
with open(path, "rb") as f: with open(path, "rb") as f:
data = f.read() data = f.read()
assert [l.decode("utf-8") for l in data.rstrip().split(b"\r\n")[1:]] == test_data assert [
line.decode("utf-8") for line in data.rstrip().split(b"\r\n")[1:]
] == test_data
def test_generate_filter_parameters(): def test_generate_filter_parameters():
@ -58,7 +60,7 @@ def test_generate_filter_parameters():
def test_write_csv_valid_unicode(generate_data, tmpdir): def test_write_csv_valid_unicode(generate_data, tmpdir):
test_data = [u"", u"", u""] test_data = ["", "", ""]
df = generate_data(test_data) df = generate_data(test_data)
path = str(tmpdir.join("test_data.csv")) path = str(tmpdir.join("test_data.csv"))
@ -67,7 +69,9 @@ def test_write_csv_valid_unicode(generate_data, tmpdir):
with open(path, "rb") as f: with open(path, "rb") as f:
data = f.read() data = f.read()
assert [l.decode("utf-8") for l in data.rstrip().split(b"\r\n")[1:]] == test_data assert [
line.decode("utf-8") for line in data.rstrip().split(b"\r\n")[1:]
] == test_data
@mock_s3 @mock_s3
@ -76,7 +80,12 @@ def test_write_csv_to_s3(generate_data):
key = "test.csv" key = "test.csv"
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=bucket) conn.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
utils.write_csv_to_s3(generate_data(["foo"]), bucket, key) utils.write_csv_to_s3(generate_data(["foo"]), bucket, key)
@ -92,7 +101,12 @@ def test_write_csv_to_s3_no_header(generate_data):
key = "test.csv" key = "test.csv"
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=bucket) conn.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
utils.write_csv_to_s3(generate_data(), bucket, key, header=False) utils.write_csv_to_s3(generate_data(), bucket, key, header=False)
@ -107,7 +121,12 @@ def test_write_csv_to_s3_existing(generate_data):
key = "test.csv" key = "test.csv"
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=bucket) conn.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={
"LocationConstraint": "us-west-2",
},
)
utils.write_csv_to_s3(generate_data(["foo"]), bucket, key) utils.write_csv_to_s3(generate_data(["foo"]), bucket, key)
utils.write_csv_to_s3(generate_data(["foo", "bar"]), bucket, key) utils.write_csv_to_s3(generate_data(["foo", "bar"]), bucket, key)

Просмотреть файл

@ -4,7 +4,7 @@
# and then run "tox" from this directory. # and then run "tox" from this directory.
[tox] [tox]
envlist = py27, py35, flake8, black, docs envlist = py37, flake8, black, docs
[pytest] [pytest]
addopts = addopts =
@ -20,17 +20,17 @@ max-line-length = 100
[testenv:flake8] [testenv:flake8]
deps = deps =
flake8==3.6.0 flake8==3.8.4
commands = commands =
flake8 mozetl tests flake8 mozetl tests
[testenv:black] [testenv:black]
deps = black deps = black==20.8b1
commands = black --check mozetl/ tests/ commands = black --check mozetl/ tests/
[testenv:docs] [testenv:docs]
description = invoke sphinx-build to build HTML docs description = invoke sphinx-build to build HTML docs
basepython = python2.7 basepython = python3.7
deps = deps =
sphinx >= 1.7.5, < 2 sphinx >= 1.7.5, < 2
m2r m2r