Create a dataset providing one flat row per (client_id, subsession_start_date) in main_summary.

This commit is contained in:
Sam Penrose 2017-06-15 07:44:49 -07:00
Родитель 63fd068d4c
Коммит b9368fb84b
13 изменённых файлов: 2003 добавлений и 64 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -1,9 +1,11 @@
*.pyc
derby.log
dist/
build/
.tox/
.coverage
.cache/
metastore_db/
*.egg-info/

Просмотреть файл

Просмотреть файл

@ -0,0 +1,177 @@
from collections import defaultdict
import pyspark.sql.functions as F
VERSION = 1
def mode(l, empty_value='MISSING'):
if not l:
return empty_value
counts = defaultdict(int)
for value in l:
counts[value] += 1
counts = [(v, k) for (k, v) in counts.items()]
counts.sort()
return counts[-1][1]
MAIN_SUMMARY_FIELD_AGGREGATORS = [
F.sum('aborts_content').alias('aborts_content_sum'),
F.sum('aborts_gmplugin').alias('aborts_gmplugin_sum'),
F.sum('aborts_plugin').alias('aborts_plugin_sum'),
# active_addons
F.mean('active_addons_count').alias('active_addons_count_mean'),
F.first('active_experiment_branch').alias('active_experiment_branch'),
F.first('active_experiment_id').alias('active_experiment_id'),
# active_theme
F.sum(F.expr('active_ticks/(3600.0/5)')).alias('active_hours_sum'),
F.first('addon_compatibility_check_enabled').alias(
'addon_compatibility_check_enabled'),
F.first('app_build_id').alias('app_build_id'),
F.first('app_display_version').alias('app_display_version'),
F.first('app_name').alias('app_name'),
F.first('app_version').alias('app_version'),
# attribution
F.first('blocklist_enabled').alias('blocklist_enabled'),
F.first('channel').alias('channel'),
F.first('city').alias('city'),
F.first('country').alias('country'),
F.sum('crashes_detected_content').alias('crashes_detected_content_sum'),
F.sum('crashes_detected_gmplugin').alias('crashes_detected_gmplugin_sum'),
F.sum('crashes_detected_plugin').alias('crashes_detected_plugin_sum'),
F.sum('crash_submit_attempt_content').alias(
'crash_submit_attempt_content_sum'),
F.sum('crash_submit_attempt_main').alias('crash_submit_attempt_main_sum'),
F.sum('crash_submit_attempt_plugin').alias(
'crash_submit_attempt_plugin_sum'),
F.sum('crash_submit_success_content').alias(
'crash_submit_success_content_sum'),
F.sum('crash_submit_success_main').alias('crash_submit_success_main_sum'),
F.sum('crash_submit_success_plugin').alias(
'crash_submit_success_plugin_sum'),
F.first('default_search_engine').alias('default_search_engine'),
F.first('default_search_engine_data_load_path').alias(
'default_search_engine_data_load_path'),
F.first('default_search_engine_data_name').alias(
'default_search_engine_data_name'),
F.first('default_search_engine_data_origin').alias(
'default_search_engine_data_origin'),
F.first('default_search_engine_data_submission_url').alias(
'default_search_engine_data_submission_url'),
F.sum('devtools_toolbox_opened_count').alias(
'devtools_toolbox_opened_count_sum'),
F.first('distribution_id').alias('distribution_id'),
# userprefs/dom_ipc_process_count
F.first('e10s_cohort').alias('e10s_cohort'),
F.first('e10s_enabled').alias('e10s_enabled'),
F.first('env_build_arch').alias('env_build_arch'),
F.first('env_build_id').alias('env_build_id'),
F.first('env_build_version').alias('env_build_version'),
# events
# experiments
F.mean('first_paint').alias('first_paint_mean'),
# F.first(
# F.expr("userprefs.extensions_allow_non_mpc_extensions"
# ).alias("extensions_allow_non_mpc_extensions")
# ),
F.first('flash_version').alias('flash_version'),
F.first('install_year').alias('install_year'),
F.first('is_default_browser').alias('is_default_browser'),
F.first('is_wow64').alias('is_wow64'),
F.first('locale').alias('locale'),
# main
F.first('memory_mb').alias('memory_mb'), # mean?
F.first('os').alias('os'),
F.first('os_service_pack_major').alias('os_service_pack_major'),
F.first('os_service_pack_minor').alias('os_service_pack_minor'),
F.first('os_version').alias('os_version'),
F.first('normalized_channel').alias('normalized_channel'),
F.countDistinct('document_id').alias('pings_aggregated_by_this_row'),
F.mean('places_bookmarks_count').alias('places_bookmarks_count_mean'),
F.mean('places_pages_count').alias('places_pages_count_mean'),
F.sum('plugin_hangs').alias('plugin_hangs_sum'),
F.sum('plugins_infobar_allow').alias('plugins_infobar_allow_sum'),
F.sum('plugins_infobar_block').alias('plugins_infobar_block_sum'),
F.sum('plugins_infobar_shown').alias('plugins_infobar_shown_sum'),
F.sum('plugins_notification_shown').alias(
'plugins_notification_shown_sum'),
# plugins_notification_user_action
# popup_notification_stats
F.first('profile_creation_date').alias('profile_creation_date'),
F.sum('push_api_notification_received').alias(
'push_api_notification_received_sum'),
F.sum('push_api_notify').alias('push_api_notify_sum'),
F.first('sample_id').alias('sample_id'),
F.first('scalar_parent_aushelper_websense_reg_version').alias(
'scalar_parent_aushelper_websense_reg_version'),
F.max('scalar_parent_browser_engagement_max_concurrent_tab_count').alias(
'scalar_parent_browser_engagement_max_concurrent_tab_count_max'),
F.max(
'scalar_parent_browser_engagement_max_concurrent_window_count').alias(
'scalar_parent_browser_engagement_max_concurrent_window_count_max'),
# scalar_parent_browser_engagement_navigation_about_home
# scalar_parent_browser_engagement_navigation_about_newtab
# scalar_parent_browser_engagement_navigation_contextmenu
# scalar_parent_browser_engagement_navigation_searchbar
# scalar_parent_browser_engagement_navigation_urlbar
F.sum('scalar_parent_browser_engagement_tab_open_event_count').alias(
'scalar_parent_browser_engagement_tab_open_event_count_sum'),
F.sum('scalar_parent_browser_engagement_total_uri_count').alias(
'scalar_parent_browser_engagement_total_uri_count_sum'),
F.sum('scalar_parent_browser_engagement_unfiltered_uri_count').alias(
'scalar_parent_browser_engagement_unfiltered_uri_count_sum'),
F.max('scalar_parent_browser_engagement_unique_domains_count').alias(
'scalar_parent_browser_engagement_unique_domains_count_max'),
F.sum('scalar_parent_browser_engagement_window_open_event_count').alias(
'scalar_parent_browser_engagement_window_open_event_count_sum'),
# F.sum('scalar_parent_browser_browser_usage_graphite').alias(
# 'scalar_parent_browser_browser_usage_graphite_sum')
F.sum('scalar_parent_devtools_copy_full_css_selector_opened').alias(
'scalar_parent_devtools_copy_full_css_selector_opened_sum'),
F.sum('scalar_parent_devtools_copy_unique_css_selector_opened').alias(
'scalar_parent_devtools_copy_unique_css_selector_opened_sum'),
F.sum('scalar_parent_devtools_toolbar_eyedropper_opened').alias(
'scalar_parent_devtools_toolbar_eyedropper_opened_sum'),
F.sum('scalar_parent_dom_contentprocess_troubled_due_to_memory').alias(
'scalar_parent_dom_contentprocess_troubled_due_to_memory_sum'),
F.sum('scalar_parent_navigator_storage_estimate_count').alias(
'scalar_parent_navigator_storage_estimate_count_sum'),
F.sum('scalar_parent_navigator_storage_persist_count').alias(
'scalar_parent_navigator_storage_persist_count_sum'),
F.first('scalar_parent_services_sync_fxa_verification_method').alias(
'scalar_parent_services_sync_fxa_verification_method'),
F.sum('scalar_parent_storage_sync_api_usage_extensions_using').alias(
'scalar_parent_storage_sync_api_usage_extensions_using_sum'),
# scalar_parent_storage_sync_api_usage_items_stored
# scalar_parent_storage_sync_api_usage_storage_consumed
F.first('scalar_parent_telemetry_os_shutting_down').alias(
'scalar_parent_telemetry_os_shutting_down'),
F.sum('scalar_parent_webrtc_nicer_stun_retransmits').alias(
'scalar_parent_webrtc_nicer_stun_retransmits_sum'),
F.sum('scalar_parent_webrtc_nicer_turn_401s').alias(
'scalar_parent_webrtc_nicer_turn_401s_sum'),
F.sum('scalar_parent_webrtc_nicer_turn_403s').alias(
'scalar_parent_webrtc_nicer_turn_403s_sum'),
F.sum('scalar_parent_webrtc_nicer_turn_438s').alias(
'scalar_parent_webrtc_nicer_turn_438s_sum'),
F.first('search_cohort').alias('search_cohort'),
F.sum('search_count').alias('search_count_sum'),
F.mean('session_restored').alias('session_restored_mean'),
# shutdown_kill
F.sum(F.expr('subsession_length/3600.0')).alias('subsession_hours_sum'),
# ssl_handshake_result
F.sum('ssl_handshake_result_failure').alias(
'ssl_handshake_result_failure_sum'),
F.sum('ssl_handshake_result_success').alias(
'ssl_handshake_result_success_sum'),
F.first('sync_configured').alias('sync_configured'),
F.sum('sync_count_desktop').alias('sync_count_desktop_sum'),
F.sum('sync_count_mobile').alias('sync_count_mobile_sum'),
F.first('telemetry_enabled').alias('telemetry_enabled'),
F.first('timezone_offset').alias('timezone_offset'),
F.sum(F.expr('total_time/3600.0')).alias('total_hours_sum'),
F.first('vendor').alias('vendor'),
F.sum('web_notification_shown').alias('web_notification_shown_sum'),
F.first('windows_build_number').alias('windows_build_number'),
F.first('windows_ubr').alias('windows_ubr'),
]

Просмотреть файл

@ -0,0 +1,144 @@
import datetime as DT
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import click
from moztelemetry.standards import filter_date_range
from mozetl.utils import (
format_spark_path,
generate_filter_parameters,
delete_from_s3
)
from fields import MAIN_SUMMARY_FIELD_AGGREGATORS
ACTIVITY_SUBMISSION_LAG = DT.timedelta(10)
MAIN_SUMMARY_VERSION = 4
MAIN_SUMMARY_PATH = "s3://telemetry-parquet/main_summary/v{}".format(
MAIN_SUMMARY_VERSION)
WRITE_VERSION = '3'
STORAGE_BUCKET = 'net-mozaws-prod-us-west-2-pipeline-analysis'
STORAGE_PREFIX = '/spenrose/clients-daily/v{}/'.format(WRITE_VERSION)
def load_main_summary(spark):
return (
spark
.read
.option("mergeSchema", "true")
.parquet(MAIN_SUMMARY_PATH)
)
def extract_search_counts(frame):
"""
The result should have exactly as many rows as the input.
"""
two_columns = frame.select(F.col("document_id").alias("did"), "search_counts")
exploded = two_columns.select(
"did", F.explode("search_counts").alias("search_struct"))
unpacked = exploded.select(
"did",
F.expr("search_struct.count").alias("search_count_atom")
)
grouped = unpacked.groupBy("did").agg({"search_count_atom": "sum"})
extracted = grouped.select(
"did", F.col("sum(search_count_atom)").alias("search_count")
)
nulls = two_columns.select(
"did").where(
"search_counts is NULL").select(
"did", F.lit(0).alias("search_count")
)
intermediate = extracted.unionAll(nulls)
result = frame.join(intermediate, frame.document_id == intermediate.did)
return result
def extract_month(first_day, frame):
"""
Pull a month's worth of activity out of frame according to the
heuristics implemented in moztelemetry.standards.
:first_day DT.date(Y, m, 1)
:frame DataFrame homologous with main_summary
"""
month = first_day.month
day_pointer = first_day
while day_pointer.month == month:
day_pointer += DT.timedelta(1)
last_day = day_pointer - DT.timedelta(1)
days_back = (last_day - first_day).days
params = generate_filter_parameters(last_day, days_back)
filtered = filter_date_range(
frame,
frame.subsession_start_date,
params['min_activity_iso'],
params['max_activity_iso'],
frame.submission_date_s3,
params['min_submission_string'],
params['max_submission_string'])
return filtered
def to_profile_day_aggregates(frame_with_extracts):
with_activity_date = frame_with_extracts.select(
"*", F.expr("substr(subsession_start_date, 1, 10)").alias("activity_date")
)
grouped = with_activity_date.groupby('client_id', 'activity_date')
return grouped.agg(*MAIN_SUMMARY_FIELD_AGGREGATORS)
def write_by_activity_day(results, day_pointer,
output_bucket, output_prefix):
month = day_pointer.month
prefix_template = os.path.join(output_prefix, 'activity_date_s3={}')
keys_to_delete = []
while day_pointer.month == month:
isoday = day_pointer.isoformat()
prefix = prefix_template.format(isoday)
output_path = format_spark_path(output_bucket, prefix)
data_for_date = results.where(results.activity_date == isoday)
data_for_date.write.parquet(output_path)
# Remove file that prevents Parquet from rolling up.
keys_to_delete.append(os.path.join(prefix, '_SUCCESS'))
day_pointer += DT.timedelta(1)
delete_from_s3(output_bucket, keys_to_delete)
@click.command()
@click.argument('--date')
@click.option('--input_bucket',
default='telemetry-parquet',
help='Bucket of the input dataset')
@click.option('--input_prefix',
default='main_summary/v4',
help='Prefix of the input dataset')
@click.option('--output_bucket',
default=STORAGE_BUCKET,
help='Bucket of the output dataset')
@click.option('--output_prefix',
default=STORAGE_PREFIX,
help='Prefix of the output dataset')
@click.option('--sample_id',
default=None,
help='Sample_id to restrict results to')
def main(date, input_bucket, input_prefix, output_bucket,
output_prefix, sample_id):
spark = (SparkSession
.builder
.appName("engagement_modeling")
.getOrCreate())
date = DT.datetime.strptime(date, '%Y-%m-%d').date()
date = DT.date(date.year, date.month, 1)
main_summary = load_main_summary(spark)
month_frame = extract_month(date, main_summary)
if sample_id:
clause = "sample_id='{}'".format(sample_id)
month_frame = month_frame.where(clause)
with_searches = extract_search_counts(month_frame)
results = to_profile_day_aggregates(with_searches)
write_by_activity_day(results, date, output_bucket, output_prefix)
if __name__ == '__main__':
main()

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -21,7 +21,7 @@ from moztelemetry.standards import (
count_distinct_clientids
)
from mozetl.utils import upload_file_to_s3
from mozetl import utils as U
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -35,39 +35,14 @@ MAUDAU_SNAPSHOT_TEMPLATE = "engagement_ratio.{}.csv"
DASHBOARD_BUCKET = "net-mozaws-prod-metrics-data"
DASHBOARD_KEY = "firefox-dashboard/{}".format(MAUDAU_ROLLUP_BASENAME)
ACTIVITY_SUBMISSION_LAG = DT.timedelta(10)
# MONTH as in "monthly active"
MONTH = 28
def format_as_submission_date(date):
return DT.date.strftime(date, "%Y%m%d")
def parse_as_submission_date(date_string):
return DT.datetime.strptime(date_string, "%Y%m%d").date()
def format_spark_path(bucket, prefix):
return "s3://{}/{}".format(bucket, prefix)
def get_rollup_s3_paths(basename):
return (STORAGE_BUCKET, os.path.join(STORAGE_SUB_DIR, basename))
def generate_filter_parameters(end_date, days_back):
d = {}
min_activity_date = end_date - DT.timedelta(days_back)
d['min_activity_iso'] = min_activity_date.isoformat()
d['max_activity_iso'] = (end_date + DT.timedelta(1)).isoformat()
d['min_submission_string'] = format_as_submission_date(min_activity_date)
max_submission_date = end_date + ACTIVITY_SUBMISSION_LAG
d['max_submission_string'] = format_as_submission_date(max_submission_date)
return d
def count_active_users(frame, end_date, days_back):
"""
Tally active users according to externally-defined heuristics,
@ -82,7 +57,7 @@ def count_active_users(frame, end_date, days_back):
See https://bugzilla.mozilla.org/show_bug.cgi?id=1240849
"""
params = generate_filter_parameters(end_date, days_back)
params = U.generate_filter_parameters(end_date, days_back)
filtered = filter_date_range(
frame,
@ -101,13 +76,13 @@ def parse_last_rollup(basename, start_date=None):
first date that needs to be re-counted.
"""
start_date = start_date or DT.date.today()
since = start_date - ACTIVITY_SUBMISSION_LAG
since = start_date - U.ACTIVITY_SUBMISSION_LAG
carryover = []
with open(basename) as f:
reader = csv.DictReader(f)
last_day = None
for row in reader:
day = parse_as_submission_date(row['day'])
day = U.parse_as_submission_date(row['day'])
if day >= since:
break
if last_day is not None:
@ -154,13 +129,13 @@ def generate_counts(frame, since, until=None):
narrow = frame.select(cols)
updates = []
today = DT.date.today()
generated = format_as_submission_date(today)
generated = U.format_as_submission_date(today)
start = since
until = until or today
while start < until:
dau = count_active_users(narrow, start, 0)
mau = count_active_users(narrow, start, MONTH)
day = format_as_submission_date(start)
day = U.format_as_submission_date(start)
d = {'day': day, 'dau': dau, 'mau': mau, 'generated_on': generated}
updates.append(d)
start += DT.timedelta(1)
@ -172,7 +147,7 @@ def write_locally(results):
:results [{'day': '%Y%m%d', 'dau': <int>,
'mau': <int>, 'generated_on': '%Y%m%d'}, ...]
'''
publication_date = format_as_submission_date(DT.date.today())
publication_date = U.format_as_submission_date(DT.date.today())
basename = MAUDAU_SNAPSHOT_TEMPLATE.format(publication_date)
cols = ["day", "dau", "mau", "generated_on"]
with open(basename, 'w') as f:
@ -188,12 +163,12 @@ def publish_to_s3(s3client, bucket, prefix, basename):
and once to the production dashboard.
'''
dated_key = os.path.join(prefix, basename)
upload_file_to_s3(s3client, basename, bucket, dated_key)
U.upload_file_to_s3(s3client, basename, bucket, dated_key)
latest_key = os.path.join(prefix, MAUDAU_ROLLUP_BASENAME)
upload_file_to_s3(s3client, basename, bucket, latest_key)
U.upload_file_to_s3(s3client, basename, bucket, latest_key)
if DEVELOPMENT:
return
upload_file_to_s3(s3client, basename, DASHBOARD_BUCKET, DASHBOARD_KEY)
U.upload_file_to_s3(s3client, basename, DASHBOARD_BUCKET, DASHBOARD_KEY)
@click.command()
@ -225,7 +200,7 @@ def main(input_bucket, input_prefix, output_bucket, output_prefix):
.appName("maudau")
.getOrCreate()
)
path = format_spark_path(input_bucket, input_prefix)
path = U.format_spark_path(input_bucket, input_prefix)
logging.info("Loading main_summary from {}".format(path))
main_summary = spark.read.option("mergeSchema", "true").parquet(path)
updates = generate_counts(main_summary, since)

14
mozetl/schemas.py Normal file
Просмотреть файл

@ -0,0 +1,14 @@
import ujson as json
import os
import pkg_resources
from pyspark.sql.types import StructType
import mozetl
SCHEMA_DIR = 'json'
MAIN_SUMMARY_SCHEMA_BASENAME = 'main_summary.v4.schema.json'
main_summary_path = os.path.join(SCHEMA_DIR, MAIN_SUMMARY_SCHEMA_BASENAME)
with pkg_resources.resource_stream(mozetl.__name__, main_summary_path) as f:
d = json.load(f)
MAIN_SUMMARY_SCHEMA = StructType.fromJson(d)

Просмотреть файл

@ -19,9 +19,9 @@ def schema_from_json(path):
:path str: Path the the json data
"""
json_data = pkg_resources.resource_stream(mozetl.topline.__name__, path)
data = json.load(json_data)
return StructType.fromJson(data)
with pkg_resources.resource_stream(mozetl.topline.__name__, path) as f:
data = json.load(f)
return StructType.fromJson(data)
# Generate module level schemas

Просмотреть файл

@ -1,4 +1,5 @@
import csv
import datetime as DT
import logging
import os
import shutil
@ -6,10 +7,36 @@ import tempfile
import boto3
ACTIVITY_SUBMISSION_LAG = DT.timedelta(10)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def format_as_submission_date(date):
return DT.date.strftime(date, "%Y%m%d")
def parse_as_submission_date(date_string):
return DT.datetime.strptime(date_string, "%Y%m%d").date()
def format_spark_path(bucket, prefix):
return "s3://{}/{}".format(bucket, prefix)
def generate_filter_parameters(end_date, days_back):
d = {}
min_activity_date = end_date - DT.timedelta(days_back)
d['min_activity_iso'] = min_activity_date.isoformat()
d['max_activity_iso'] = (end_date + DT.timedelta(1)).isoformat()
d['min_submission_string'] = format_as_submission_date(min_activity_date)
max_submission_date = end_date + ACTIVITY_SUBMISSION_LAG
d['max_submission_string'] = format_as_submission_date(max_submission_date)
return d
def write_csv(dataframe, path, header=True):
""" Write a dataframe to local disk.
@ -57,3 +84,13 @@ def upload_file_to_s3(client, filepath, bucket, key,
ACL='bucket-owner-full-control'):
with open(filepath, 'rb') as data:
client.put_object(Bucket=bucket, Key=key, Body=data, ACL=ACL)
def delete_from_s3(bucket_name, keys_to_delete):
bucket = boto3.resource('s3').Bucket(bucket_name)
objects = [{'Key': key} for key in keys_to_delete]
response = bucket.delete_objects(Delete={'Objects': objects})
code = response['ResponseMetadata']['HTTPStatusCode']
if code != 200:
msg = "AWS returned {} when attempting to delete {}"
raise RuntimeError(msg.format(code, keys_to_delete))

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -0,0 +1,66 @@
import datetime as DT
import pytest
import os
from mozetl.schemas import MAIN_SUMMARY_SCHEMA
EXPECTED_INTEGER_VALUES = {
'active_addons_count_mean': 3613,
'crashes_detected_content_sum': 9,
'first_paint_mean': 12802105,
'pings_aggregated_by_this_row': 1122,
'search_count_sum': 1043
}
@pytest.fixture
def make_frame(spark):
root = os.path.dirname(__file__)
path = os.path.join(root, 'resources',
'main_summary-late-may-1123-rows-anonymized.json')
frame = spark.read.json(path, MAIN_SUMMARY_SCHEMA)
return frame
@pytest.fixture
def make_frame_with_extracts(spark):
from mozetl.clientsdaily import rollup
frame = make_frame(spark)
return rollup.extract_search_counts(frame)
def test_extract_search_counts(spark):
from mozetl.clientsdaily import rollup
frame = make_frame(spark)
extracted = rollup.extract_search_counts(frame)
row = extracted.agg({'search_count': 'sum'}).collect()[0]
total = row.asDict().values()[0]
assert total == EXPECTED_INTEGER_VALUES['search_count_sum']
def test_extract_month(spark):
from mozetl.clientsdaily import rollup
frame = make_frame(spark)
month_frame0 = rollup.extract_month(DT.date(2000, 1, 1), frame)
count0 = month_frame0.count()
assert count0 == 0
month_frame1 = rollup.extract_month(DT.date(2017, 6, 1), frame)
count1 = month_frame1.count()
assert count1 == 68
def test_to_profile_day_aggregates(spark):
from mozetl.clientsdaily import rollup
frame = make_frame_with_extracts(spark)
clients_daily = rollup.to_profile_day_aggregates(frame)
# Sum up the means and sums as calculated over 1123 rows,
# one of which is a duplicate.
aggd = dict([(k, 'sum') for k in EXPECTED_INTEGER_VALUES])
result = clients_daily.agg(aggd).collect()[0]
for k, expected in EXPECTED_INTEGER_VALUES.items():
actual = int(result['sum({})'.format(k)])
assert actual == expected

Просмотреть файл

@ -4,13 +4,14 @@ import pytest
import tempfile
from pyspark.sql.types import StructField, StructType, StringType
from mozetl.maudau import maudau as M
from mozetl.utils import format_as_submission_date
NARROW_SCHEMA = StructType([
StructField("client_id", StringType(), True),
StructField("submission_date_s3", StringType(), False),
StructField("subsession_start_date", StringType(), True)])
generated = M.format_as_submission_date(DT.date.today())
generated = format_as_submission_date(DT.date.today())
@pytest.fixture
@ -30,29 +31,6 @@ def make_frame(spark):
schema=NARROW_SCHEMA)
def test_generate_filter_parameters():
"""
Check the two meaningful cases: DAU (0 days) and MAU(28 days).
"""
expected0 = {
'min_activity_iso': '2017-01-31',
'max_activity_iso': '2017-02-01',
'min_submission_string': '20170131',
'max_submission_string': '20170210'
}
actual0 = M.generate_filter_parameters(DT.date(2017, 1, 31), 0)
assert expected0 == actual0, str(actual0)
expected28 = {
'min_activity_iso': '2017-01-03',
'max_activity_iso': '2017-02-01',
'min_submission_string': '20170103',
'max_submission_string': '20170210'
}
actual28 = M.generate_filter_parameters(DT.date(2017, 1, 31), 28)
assert expected28 == actual28
def test_generate_counts(spark):
frame = make_frame(spark)
since = DT.date(2017, 5, 1)

Просмотреть файл

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
import datetime as DT
import boto3
import pytest
from moto import mock_s3
@ -35,6 +35,29 @@ def test_write_csv_ascii(generate_data, tmpdir):
assert data.rstrip().split('\r\n')[1:] == test_data
def test_generate_filter_parameters():
"""
Check the two meaningful cases: DAU (0 days) and MAU(28 days).
"""
expected0 = {
'min_activity_iso': '2017-01-31',
'max_activity_iso': '2017-02-01',
'min_submission_string': '20170131',
'max_submission_string': '20170210'
}
actual0 = utils.generate_filter_parameters(DT.date(2017, 1, 31), 0)
assert expected0 == actual0, str(actual0)
expected28 = {
'min_activity_iso': '2017-01-03',
'max_activity_iso': '2017-02-01',
'min_submission_string': '20170103',
'max_submission_string': '20170210'
}
actual28 = utils.generate_filter_parameters(DT.date(2017, 1, 31), 28)
assert expected28 == actual28
def test_write_csv_valid_unicode(generate_data, tmpdir):
test_data = [u'', u'', u'']
df = generate_data(test_data)