From 94b50ac641b5194722169b8d84508c8ec8d1f564 Mon Sep 17 00:00:00 2001 From: Roberto Agostino Vitillo Date: Thu, 28 May 2015 23:02:45 +0000 Subject: [PATCH] Improve aggregation performance. --- mozaggregator/db_updater.py | 44 ++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/mozaggregator/db_updater.py b/mozaggregator/db_updater.py index ecc074d..af8556b 100644 --- a/mozaggregator/db_updater.py +++ b/mozaggregator/db_updater.py @@ -26,29 +26,24 @@ def _preparedb(): conn = _create_connection() cursor = conn.cursor() query = """ -create or replace function aggregate_arrays(acc bigint[], x jsonb) returns bigint[] as $$ +create or replace function aggregate_arrays(acc bigint[], x bigint[]) returns bigint[] as $$ declare - i int; - tmp bigint; + i int; begin -for i in 0 .. json_array_length(x::json) - 1 -loop - if acc[i + 1] is NULL then - acc[i + 1] = 0; - end if; - tmp := x->i; - acc[i + 1] := acc[i + 1] + tmp; -end loop; -return acc; + for i in 1 .. array_length(x, 1) + loop + acc[i] := coalesce(acc[i], 0) + x[i]; + end loop; + return acc; end $$ language plpgsql strict immutable; -drop aggregate if exists aggregate_histograms(jsonb); -create aggregate aggregate_histograms ( jsonb ) ( +drop aggregate if exists aggregate_histograms(bigint[]); +create aggregate aggregate_histograms (bigint[]) ( sfunc = aggregate_arrays, stype = bigint[], initcond = '{}' ); -create or replace function add_buildid_metric(channel text, version text, buildid text, dimensions jsonb, histogram jsonb) returns void as $$ +create or replace function add_buildid_metric(channel text, version text, buildid text, dimensions jsonb, histogram bigint[]) returns void as $$ declare tablename text; table_exists bool; @@ -61,9 +56,10 @@ begin execute 'create table ' || tablename || '(id serial primary key) inherits (telemetry_aggregates_buildid)'; execute 'create index on ' || tablename || ' using GIN (dimensions jsonb_path_ops)'; end if; + -- Check if the document already exists and update it, if not create one execute 'with upsert as (update ' || tablename || ' as t - set histogram = array_to_json((select aggregate_histograms(v) from (values (1, t.histogram), (2, $1)) as t (k, v)))::jsonb + set histogram = (select aggregate_histograms(v) from (values (1, t.histogram), (2, $1)) as t (k, v)) where t.dimensions @> $2 returning t.*) insert into ' || tablename || ' (dimensions, histogram) @@ -73,7 +69,7 @@ begin end $$ language plpgsql strict; -create table if not exists telemetry_aggregates_buildid (dimensions jsonb, histogram jsonb); +create table if not exists telemetry_aggregates_buildid (dimensions jsonb, histogram bigint[]); """ cursor.execute(query) @@ -100,7 +96,7 @@ def _get_complete_histogram(metric, values): return map(int, list(histogram)) -def _commit_partial_aggregate(cursor, aggregate): +def _commit_partial_aggregate_query(cursor, aggregate): key, metrics = aggregate channel, version, build_id, application, architecture, revision, os, os_version = key @@ -123,21 +119,19 @@ def _commit_partial_aggregate(cursor, aggregate): except KeyError as e: # TODO: use revision service once it's ready continue - cursor.execute("select add_buildid_metric('{}', '{}', '{}', '{}', '{}')".format(channel, - version, - build_id, - json.dumps(dimensions), - json.dumps(histogram))) + cursor.execute("select add_buildid_metric('{}', '{}', '{}', '{}', array[{}])".format(channel, + version, build_id, json.dumps(dimensions),histogram)) + def _update_partial_aggregate(aggregate): conn = _create_connection() cursor = conn.cursor() try: - return _commit_partial_aggregate(cursor, aggregate) + _commit_partial_aggregate_query(cursor, aggregate) except psycopg2.IntegrityError as e: #see: http://www.postgresql.org/message-id/CA+TgmoZAdYVtwBfp1FL2sMZbiHCWT4UPrzRLNnX1Nb30Ku3-gg@mail.gmail.com - return _commit_partial_aggregate(cursor, aggregate) + _commit_partial_aggregate_query(cursor, aggregate) conn.close()