python_mozaggregator/mozaggregator/sql.py

329 строки
13 KiB
Python

query = r"""
create or replace function aggregate_table_name(prefix text, channel text, version text, date text) returns text as $$
begin
return format('%s_%s_%s_%s', prefix, channel, version, date);
end
$$ language plpgsql strict immutable;
create or replace function cast_array_to_bigint_safe(input numeric[]) returns bigint[] as $$
declare
output numeric[];
begin
output := input;
if (select min(x) from unnest(output) as x) < -9223372036854775808 then
RAISE WARNING 'Truncating negative value(s) too large for bigint in array: %', output;
output := (select ARRAY(select GREATEST(x, -9223372036854775808) from unnest(output) as x));
end if;
if (select max(x) from unnest(output) as x) > 9223372036854775807 then
RAISE WARNING 'Truncating positive value(s) too large for bigint in array: %', output;
output := (select ARRAY(select LEAST(x, 9223372036854775807) from unnest(output) as x));
end if;
return output;
end
$$ language plpgsql strict immutable;
create or replace function aggregate_arrays(acc bigint[], x bigint[]) returns bigint[] as $$
begin
return cast_array_to_bigint_safe(
(select array(
select sum(elem)
from (values (1, acc), (2, x)) as t(idx, arr)
, unnest(t.arr) with ordinality x(elem, rn)
group by rn
order by rn)));
end
$$ language plpgsql strict immutable;
create or replace function aggregate_histogram_arrays(acc bigint[], x bigint[]) returns bigint[] as $$
begin
return (select (
aggregate_arrays(x[1 : GREATEST(array_length(x, 1) - 2, 1)],
acc[1 : GREATEST(array_length(acc, 1) - 2, 1)])
||
aggregate_arrays(x[GREATEST(array_length(x, 1) - 1, 1) : GREATEST(array_length(x, 1), 1)],
acc[GREATEST(array_length(acc, 1) - 1, 1) : GREATEST(array_length(acc, 1), 1)])
));
end
$$ language plpgsql strict immutable;
drop aggregate if exists aggregate_histograms(bigint[]);
create aggregate aggregate_histograms (bigint[]) (
sfunc = aggregate_histogram_arrays, stype = bigint[], initcond = '{}'
);
create or replace function merge_table(prefix text, channel text, version text, date text, stage_table regclass) returns void as $$
declare
tablename text;
table_exists bool;
begin
tablename := aggregate_table_name(prefix, channel, version, date);
-- Check if table exists and if not create one
table_exists := (select exists (select 1 from information_schema.tables where table_schema = 'public' and table_name = tablename));
if not table_exists then
execute format('create table %s as table %s', tablename, stage_table);
execute format('create index on %s using GIN (dimensions jsonb_path_ops)', tablename);
perform update_filter_options(channel, version, stage_table);
return;
end if;
-- Update existing tuples and delete matching rows from the staging table
execute 'with merge as (update ' || tablename || ' as dest
set histogram = aggregate_histogram_arrays(dest.histogram, src.histogram)
from ' || stage_table || ' as src
where dest.dimensions = src.dimensions
returning dest.*)
delete from ' || stage_table || ' as stage
using merge
where stage.dimensions = merge.dimensions';
-- Insert new tuples
execute 'insert into ' || tablename || ' (dimensions, histogram)
select dimensions, histogram from ' || stage_table;
perform update_filter_options(channel, version, stage_table);
end
$$ language plpgsql strict;
create or replace function lock_transaction(prefix text, channel text, version text, date text) returns bigint as $$
declare
table_name text;
lock bigint;
begin
table_name := aggregate_table_name(prefix, channel, version, date);
lock := (select h_bigint(table_name));
execute 'select pg_advisory_xact_lock($1)' using lock;
return lock;
end
$$ language plpgsql strict;
create or replace function h_bigint(text) returns bigint as $$
select ('x'||substr(md5($1),1,16))::bit(64)::bigint;
$$ language sql;
create or replace function create_temporary_table(prefix text, channel text, version text, date text) returns text as $$
declare
tablename text;
begin
tablename := aggregate_table_name('staging_' || prefix, channel, version, date);
execute 'create temporary table ' || tablename || ' (dimensions jsonb, histogram bigint[]) on commit drop';
return tablename;
end
$$ language plpgsql strict;
create or replace function was_processed(prefix text, channel text, version text, date text, submission_date text) returns boolean as $$
declare
table_name text;
was_processed boolean;
begin
table_name := aggregate_table_name(prefix, channel, version, date);
select exists(select 1
from table_update_dates as t
where t.tablename = table_name and submission_date = any(t.submission_dates))
into was_processed;
if (was_processed) then
return was_processed;
end if;
with upsert as (update table_update_dates
set submission_dates = submission_dates || submission_date
where tablename = table_name
returning *)
insert into table_update_dates
select * from (values (table_name, array[submission_date])) as t
where not exists(select 1 from upsert);
return was_processed;
end
$$ language plpgsql strict;
-- We have to explicitly drop the old get_metric function or else the new one will not be backwards compatible
drop function if exists get_metric(text, text, text, text, jsonb);
-- The default value for new_dimensions has to be something that will never match
create or replace function get_metric(prefix text, channel text, version text, date text, dimensions jsonb, new_dimensions jsonb DEFAULT '{"metric":"METRIC???"}') returns table(label text, histogram bigint[]) as $$
declare
tablename text;
begin
if not dimensions ? 'metric' then
raise exception 'Missing metric field!';
end if;
if not new_dimensions ? 'metric' then
raise exception 'Missing metric field!';
end if;
tablename := aggregate_table_name(prefix, channel, version, date);
return query execute
E'select dimensions->>\'label\', aggregate_histograms(histogram)
from ' || tablename || E'
where dimensions @> $1
or dimensions @> $2
group by dimensions->>\'label\''
using dimensions, new_dimensions;
end
$$ language plpgsql strict stable;
drop type if exists metric_type;
create type metric_type AS (label text, histogram bigint[]);
drop function if exists batched_get_metric(text, text, text, text[], jsonb);
create or replace function batched_get_metric(prefix text, channel text, version text, dates text[], dimensions jsonb, new_dimensions jsonb DEFAULT '{"metric":"METRIC???"}') returns table(date text, label text, histogram bigint[]) as $$
begin
return query select t.date, (get_metric(prefix, channel, version, t.date, dimensions, new_dimensions)::text::metric_type).*
from (select unnest(dates)) as t(date);
end
$$ language plpgsql strict;
create or replace function batched_get_use_counter(prefix text, channel text, version text, dates text[], denominator_dimensions jsonb, denominator_new_dimensions jsonb, dimensions jsonb, new_dimensions jsonb DEFAULT '{"metric":"METRIC???"}') returns table(date text, label text, histogram bigint[]) as $$
begin
return query
select t2.date,
coalesce(t1.label, ''),
case
when t1.histogram is null then ARRAY[
t2.histogram[array_length(t2.histogram, 1) - 1],
0,
0,
0,
t2.histogram[array_length(t2.histogram, 1)]]
when t2.histogram is null then t1.histogram
else ARRAY[
t2.histogram[array_length(t2.histogram, 1) - 1] - t1.histogram[2] - t1.histogram[3],
t1.histogram[2],
t1.histogram[3],
t1.histogram[4],
t1.histogram[5]]
end
from batched_get_metric(prefix, channel, version, dates, dimensions, new_dimensions) t1
full outer join batched_get_metric(prefix, channel, version, dates, denominator_dimensions, denominator_new_dimensions) t2
on t1.date = t2.date;
end
$$ language plpgsql strict;
create or replace function list_buildids(prefix text, channel text) returns table(version text, buildid text) as $$
begin
return query execute
E'select t.matches[2], t.matches[3] from
(select regexp_matches(table_name::text, $3)
from information_schema.tables
where table_schema=\'public\' and table_type=\'BASE TABLE\' and table_name like $1 || $2
order by table_name desc) as t (matches)'
using prefix, '_' || channel || '%', '^' || prefix || '_([^_]+)_([0-9]+)_([0-9]+)$';
end
$$ language plpgsql strict;
create or replace function list_channels(prefix text) returns table(channel text) as $$
begin
return query execute
E'select distinct t.matches[1] from
(select regexp_matches(table_name::text, $1 || \'_([^_]+)_([0-9]+)_([0-9]+)\')
from information_schema.tables
where table_schema=\'public\' and table_type=\'BASE TABLE\'
order by table_name desc) as t (matches)'
using prefix;
end
$$ language plpgsql strict;
create or replace function get_dimension_values(filter text, table_name regclass) returns table(option text) as $$
declare
begin
-- TODO: os & osVersion should be merged into a single dimension...
if (filter = 'osVersion') then
return query execute
E'select concat(t.os, \',\', t.version)
from (select distinct dimensions->>\'os\', dimensions->>\'osVersion\'
from ' || table_name || E') as t(os, version)';
else
return query execute
E'select distinct dimensions->>\'' || filter || E'\' from ' || table_name;
end if;
end
$$ language plpgsql strict stable;
create or replace function update_filter_options(channel text, version text, stage_table regclass) returns void as $$
declare
table_match text;
dimension_sample jsonb;
dimension text;
begin
table_match := aggregate_table_name('*', channel, version, '*');
execute 'select dimensions
from ' || stage_table || '
limit 1'
into dimension_sample;
perform lock_transaction('*', channel, version, '*');
for dimension in select jsonb_object_keys(dimension_sample)
loop
if (dimension = 'label' or dimension = 'os') then
continue;
end if;
execute E'with curr as (select value
from filter_options
where table_match = $1 and filter = $2),
new as (select get_dimension_values($2, $3)),
diff as (select * from new except select * from curr)
insert into filter_options (table_match, filter, value)
select $1, $2, t.value
from diff as t(value)'
using table_match, dimension, stage_table;
end loop;
end
$$ language plpgsql strict;
create or replace function get_filter_options(channel text, version text, dimension text) returns table(option text) as $$
declare
match_table text;
begin
match_table := aggregate_table_name('*', channel, version, '*');
if dimension = 'os' then
dimension := 'osVersion';
end if;
return query
select value
from filter_options
where table_match = match_table and filter = dimension;
end
$$ language plpgsql strict;
create or replace function create_tables() returns void as $$
declare
table_exists boolean;
begin
table_exists := (select exists (select 1 from information_schema.tables where table_schema = 'public' and table_name = 'table_update_dates'));
if (not table_exists) then
create table table_update_dates (tablename text primary key, submission_dates text[]);
create index on table_update_dates (tablename);
end if;
table_exists := (select exists (select 1 from information_schema.tables where table_schema = 'public' and table_name = 'filter_options'));
if (not table_exists) then
create table filter_options (id serial primary key, table_match text not null, filter text not null, value text not null);
create index on filter_options (table_match);
end if;
end
$$ language plpgsql strict;
select create_tables();
"""