Add per channel-version filter options.

This commit is contained in:
Roberto Agostino Vitillo 2015-07-26 09:41:37 +00:00
Родитель 11f4bb5865
Коммит adaba780df
4 изменённых файлов: 132 добавлений и 3 удалений

Просмотреть файл

@ -108,13 +108,14 @@ begin
if not table_exists then
execute format('create table %s as table %s', tablename, stage_table);
execute format('create index on %s using GIN (dimensions jsonb_path_ops)', tablename);
perform update_filter_options(channel, version, stage_table);
return;
end if;
-- Update existing tuples and delete matching rows from the staging table
execute 'with merge as (update ' || tablename || ' as dest
set histogram = aggregate_arrays(dest.histogram, src.histogram)
from ' || stage_table || ' as src
set histogram = aggregate_arrays(dest.histogram, src.histogram)
from ' || stage_table || ' as src
where dest.dimensions = src.dimensions
returning dest.*)
delete from ' || stage_table || ' as stage
@ -124,6 +125,7 @@ begin
-- Insert new tuples
execute 'insert into ' || tablename || ' (dimensions, histogram)
select dimensions, histogram from ' || stage_table;
perform update_filter_options(channel, version, stage_table);
end
$$ language plpgsql strict;
@ -268,6 +270,76 @@ end
$$ language plpgsql strict stable;
create or replace function get_dimension_values(filter text, table_name regclass) returns table(option text) as $$
declare
begin
-- TODO: os & osVersion should be merged into a single dimension...
if (filter = 'osVersion') then
return query execute
E'select concat(t.os, \',\', t.version)
from (select distinct dimensions->>\'os\', dimensions->>\'osVersion\'
from ' || table_name || E') as t(os, version)';
else
return query execute
E'select distinct dimensions->>\'' || filter || E'\' from ' || table_name;
end if;
end
$$ language plpgsql strict stable;
create or replace function update_filter_options(channel text, version text, stage_table regclass) returns void as $$
declare
table_match text;
dimension_sample jsonb;
dimension text;
begin
table_match := aggregate_table_name('*', channel, version, '*');
execute 'select dimensions
from ' || stage_table || '
limit 1'
into dimension_sample;
perform lock_transaction('*', channel, version, '*');
for dimension in select jsonb_object_keys(dimension_sample)
loop
if (dimension = 'label' or dimension = 'os') then
continue;
end if;
execute E'with curr as (select value
from filter_options
where table_match = $1 and filter = $2),
new as (select get_dimension_values($2, $3)),
diff as (select * from new except select * from curr)
insert into filter_options (table_match, filter, value)
select $1, $2, t.value
from diff as t(value)'
using table_match, dimension, stage_table;
end loop;
end
$$ language plpgsql strict;
create or replace function get_filter_options(channel text, version text, dimension text) returns table(option text) as $$
declare
match_table text;
begin
match_table := aggregate_table_name('*', channel, version, '*');
if dimension = 'os' then
dimension := 'osVersion';
end if;
return query
select value
from filter_options
where table_match = match_table and filter = dimension;
end
$$ language plpgsql strict;
create or replace function create_tables() returns void as $$
declare
table_exists boolean;
@ -277,6 +349,12 @@ begin
create table table_update_dates (tablename text primary key, submission_dates text[]);
create index on table_update_dates (tablename);
end if;
table_exists := (select exists (select 1 from information_schema.tables where table_schema = 'public' and table_name = 'filter_options'));
if (not table_exists) then
create table filter_options (id serial primary key, table_match text not null, filter text not null, value text not null);
create index on filter_options (table_match);
end if;
end
$$ language plpgsql strict;

Просмотреть файл

@ -105,6 +105,46 @@ def get_filters_options(prefix, channel):
abort(404)
def get_filter_options_new(channel, version, filters, filter):
options = execute_query("select * from get_filter_options(%s, %s, %s)", (channel, version, filter))
if not options or (len(options) == 1 and options[0][0] is None):
return
pretty_opts = []
for option in options:
option = option[0]
if filter == "metric" and option.startswith("[[COUNT]]_"):
pretty_opts.append(option[10:])
else:
pretty_opts.append(option)
filters[filter] = pretty_opts
@app.route('/filters/', methods=["GET"])
@cache_request
def get_filters_options_new():
try:
channel = request.args.get("channel", None)
version = request.args.get("version", None)
if not channel or not version:
abort(404)
filters = {}
dimensions = ["metric", "application", "architecture", "os", "e10sEnabled", "child"]
Parallel(n_jobs=len(dimensions), backend="threading")(delayed(get_filter_options_new)(channel, version, filters, f)
for f in dimensions)
if not filters:
abort(404)
return json.dumps(filters)
except:
abort(404)
@app.route('/aggregates_by/<prefix>/channels/<channel>/', methods=["GET"])
@cache_request
def get_dates_metrics(prefix, channel):

Просмотреть файл

@ -8,7 +8,7 @@
from setuptools import setup
setup(name='python_mozaggregator',
version='0.2.2.1',
version='0.2.2.2',
author='Roberto Agostino Vitillo',
author_email='rvitillo@mozilla.com',
description='Telemetry aggregation job',

Просмотреть файл

@ -96,6 +96,17 @@ def test_filters():
assert set(options["child"]) == set(["true", "false"])
def test_filters_new():
for channel in ping_dimensions["channel"]:
for version in [v.split('.')[0] for v in ping_dimensions["version"]]:
options = requests.get("{}/filters/?channel={}&version={}".format(SERVICE_URI, channel, version)).json()
# We should really test all filters...
assert set(options["application"]) == set(ping_dimensions["application"])
assert set(options["architecture"]) == set(ping_dimensions["arch"])
assert set(options["e10sEnabled"]) == set(["true", "false"])
assert set(options["child"]) == set(["true", "false"])
def test_build_id_metrics():
template_channel = ping_dimensions["channel"]
template_version = [x.split('.')[0] for x in ping_dimensions["version"]]