Improve aggregation performance.
This commit is contained in:
Родитель
ef7dbe99bf
Коммит
94b50ac641
|
@ -26,29 +26,24 @@ def _preparedb():
|
|||
conn = _create_connection()
|
||||
cursor = conn.cursor()
|
||||
query = """
|
||||
create or replace function aggregate_arrays(acc bigint[], x jsonb) returns bigint[] as $$
|
||||
create or replace function aggregate_arrays(acc bigint[], x bigint[]) returns bigint[] as $$
|
||||
declare
|
||||
i int;
|
||||
tmp bigint;
|
||||
i int;
|
||||
begin
|
||||
for i in 0 .. json_array_length(x::json) - 1
|
||||
loop
|
||||
if acc[i + 1] is NULL then
|
||||
acc[i + 1] = 0;
|
||||
end if;
|
||||
tmp := x->i;
|
||||
acc[i + 1] := acc[i + 1] + tmp;
|
||||
end loop;
|
||||
return acc;
|
||||
for i in 1 .. array_length(x, 1)
|
||||
loop
|
||||
acc[i] := coalesce(acc[i], 0) + x[i];
|
||||
end loop;
|
||||
return acc;
|
||||
end
|
||||
$$ language plpgsql strict immutable;
|
||||
|
||||
drop aggregate if exists aggregate_histograms(jsonb);
|
||||
create aggregate aggregate_histograms ( jsonb ) (
|
||||
drop aggregate if exists aggregate_histograms(bigint[]);
|
||||
create aggregate aggregate_histograms (bigint[]) (
|
||||
sfunc = aggregate_arrays, stype = bigint[], initcond = '{}'
|
||||
);
|
||||
|
||||
create or replace function add_buildid_metric(channel text, version text, buildid text, dimensions jsonb, histogram jsonb) returns void as $$
|
||||
create or replace function add_buildid_metric(channel text, version text, buildid text, dimensions jsonb, histogram bigint[]) returns void as $$
|
||||
declare
|
||||
tablename text;
|
||||
table_exists bool;
|
||||
|
@ -61,9 +56,10 @@ begin
|
|||
execute 'create table ' || tablename || '(id serial primary key) inherits (telemetry_aggregates_buildid)';
|
||||
execute 'create index on ' || tablename || ' using GIN (dimensions jsonb_path_ops)';
|
||||
end if;
|
||||
|
||||
-- Check if the document already exists and update it, if not create one
|
||||
execute 'with upsert as (update ' || tablename || ' as t
|
||||
set histogram = array_to_json((select aggregate_histograms(v) from (values (1, t.histogram), (2, $1)) as t (k, v)))::jsonb
|
||||
set histogram = (select aggregate_histograms(v) from (values (1, t.histogram), (2, $1)) as t (k, v))
|
||||
where t.dimensions @> $2
|
||||
returning t.*)
|
||||
insert into ' || tablename || ' (dimensions, histogram)
|
||||
|
@ -73,7 +69,7 @@ begin
|
|||
end
|
||||
$$ language plpgsql strict;
|
||||
|
||||
create table if not exists telemetry_aggregates_buildid (dimensions jsonb, histogram jsonb);
|
||||
create table if not exists telemetry_aggregates_buildid (dimensions jsonb, histogram bigint[]);
|
||||
"""
|
||||
|
||||
cursor.execute(query)
|
||||
|
@ -100,7 +96,7 @@ def _get_complete_histogram(metric, values):
|
|||
return map(int, list(histogram))
|
||||
|
||||
|
||||
def _commit_partial_aggregate(cursor, aggregate):
|
||||
def _commit_partial_aggregate_query(cursor, aggregate):
|
||||
key, metrics = aggregate
|
||||
channel, version, build_id, application, architecture, revision, os, os_version = key
|
||||
|
||||
|
@ -123,21 +119,19 @@ def _commit_partial_aggregate(cursor, aggregate):
|
|||
except KeyError as e: # TODO: use revision service once it's ready
|
||||
continue
|
||||
|
||||
cursor.execute("select add_buildid_metric('{}', '{}', '{}', '{}', '{}')".format(channel,
|
||||
version,
|
||||
build_id,
|
||||
json.dumps(dimensions),
|
||||
json.dumps(histogram)))
|
||||
cursor.execute("select add_buildid_metric('{}', '{}', '{}', '{}', array[{}])".format(channel,
|
||||
version, build_id, json.dumps(dimensions),histogram))
|
||||
|
||||
|
||||
def _update_partial_aggregate(aggregate):
|
||||
conn = _create_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
return _commit_partial_aggregate(cursor, aggregate)
|
||||
_commit_partial_aggregate_query(cursor, aggregate)
|
||||
except psycopg2.IntegrityError as e:
|
||||
#see: http://www.postgresql.org/message-id/CA+TgmoZAdYVtwBfp1FL2sMZbiHCWT4UPrzRLNnX1Nb30Ku3-gg@mail.gmail.com
|
||||
return _commit_partial_aggregate(cursor, aggregate)
|
||||
_commit_partial_aggregate_query(cursor, aggregate)
|
||||
|
||||
conn.close()
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче