Move shredder_progress view to monitoring_derived

This commit is contained in:
Anna Scholtz 2021-01-20 13:13:16 -08:00
Родитель 1dd4c1d399
Коммит fdd37ebee3
2 изменённых файлов: 122 добавлений и 260 удалений

Просмотреть файл

@ -1,190 +1,7 @@
CREATE OR REPLACE VIEW
`moz-fx-data-shared-prod.monitoring.shredder_progress`
AS
WITH shredder AS (
SELECT
task_id,
CASE
WHEN
target = "moz-fx-data-shared-prod.telemetry_stable.main_v4"
THEN
"on_demand"
WHEN
target = "moz-fx-data-shared-prod.telemetry_derived.main_summary_v4"
THEN
"flat_rate_main_summary"
ELSE
"flat_rate"
END
AS airflow_task_id,
target,
end_date,
-- oldest table size
ARRAY_AGG(STRUCT(target_bytes, source_bytes) ORDER BY _PARTITIONTIME LIMIT 1)[OFFSET(0)].*,
-- newest job
ARRAY_AGG(
STRUCT(
-- job metadata over 28 days old is not queried
job_created <= TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 28 DAY) AS job_too_old,
job_created,
SPLIT(job_id, ":")[offset(0)] AS project_id,
SPLIT(job_id, ".")[offset(1)] AS job_id
)
ORDER BY
job_created DESC
LIMIT
1
)[OFFSET(0)].*
FROM
`moz-fx-data-shredder.shredder_state.tasks`
LEFT JOIN
`moz-fx-data-shredder.shredder_state.shredder_state`
USING
(task_id, end_date)
GROUP BY
task_id,
target, -- every task has exactly one target
end_date
),
jobs AS (
-- https://cloud.google.com/bigquery/docs/information-schema-jobs
SELECT
*
FROM
`moz-fx-data-shredder.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
UNION ALL
SELECT
*
FROM
`moz-fx-data-bq-batch-prod.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
),
successful_jobs AS (
SELECT
*,
total_bytes_processed AS bytes_complete,
total_slot_ms AS slot_ms,
FROM
jobs
WHERE
creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 28 DAY)
AND state = "DONE"
AND error_result IS NULL
),
progress_by_target AS (
SELECT
airflow_task_id,
end_date,
MIN(IFNULL(start_time, job_created)) AS start_time,
MAX(end_time) AS end_time,
-- assume jobs too old for metadata are complete
LOGICAL_AND(job_too_old IS TRUE OR end_time IS NOT NULL) AS complete,
COUNTIF(job_too_old IS TRUE OR end_time IS NOT NULL) AS tasks_complete,
COUNT(*) AS tasks_total,
SUM(bytes_complete) AS bytes_complete,
-- count target_bytes once per table and source_bytes once per task
MIN(target_bytes) + SUM(source_bytes) AS bytes_total,
SUM(slot_ms) AS slot_ms,
NULLIF(SUM(bytes_complete), 0) / SUM(slot_ms) AS bytes_per_slot_ms,
FROM
shredder
LEFT JOIN
successful_jobs
USING
(project_id, job_id)
GROUP BY
target,
airflow_task_id,
end_date
),
avg_bytes_per_slot_ms AS (
SELECT
airflow_task_id,
end_date,
-- avg weighted by bytes_total
SUM(bytes_per_slot_ms * bytes_total) / SUM(bytes_total) AS avg_bytes_per_slot_ms,
FROM
progress_by_target
WHERE
bytes_per_slot_ms IS NOT NULL
GROUP BY
airflow_task_id,
end_date
),
progress AS (
SELECT
airflow_task_id,
end_date,
LOGICAL_AND(complete) AS complete,
SUM(slot_ms) AS slot_ms_complete,
-- estimate slot_ms remaining using bytes_per_slot_ms per table where available
SUM(bytes_total / IFNULL(bytes_per_slot_ms, avg_bytes_per_slot_ms)) AS slot_ms_total,
-- diff in seconds to include fractional days
MIN(start_time) AS start_time,
TIMESTAMP_DIFF(
COALESCE(
MAX(end_time),
-- start_time of the next airflow run when complete
ANY_VALUE(MIN(start_time)) OVER (
PARTITION BY
airflow_task_id
ORDER BY
end_date
ROWS BETWEEN
1 FOLLOWING
AND 1 FOLLOWING
),
-- current time for the last airflow run
CURRENT_TIMESTAMP
),
MIN(start_time),
SECOND
) AS seconds_complete,
SUM(bytes_complete) AS bytes_complete,
SUM(bytes_total) AS bytes_total,
SUM(tasks_complete) AS tasks_complete,
SUM(tasks_total) AS tasks_total,
FROM
progress_by_target
LEFT JOIN
avg_bytes_per_slot_ms AS _
USING
(airflow_task_id, end_date)
GROUP BY
airflow_task_id,
end_date
)
SELECT
STRUCT(airflow_task_id AS task_id, end_date) AS airflow,
start_time,
complete,
IF(
complete,
STRUCT(
TIMESTAMP_ADD(start_time, INTERVAL seconds_complete SECOND) AS actual,
NULL AS estimate_by_slot_ms,
NULL AS estimate_by_bytes,
NULL AS estimate_by_tasks
),
STRUCT(
NULL AS actual,
TIMESTAMP_ADD(
start_time,
INTERVAL CAST(
IFNULL(slot_ms_total / slot_ms_complete, 1) * seconds_complete AS INT64
) SECOND
) AS estimate_by_slot_ms,
TIMESTAMP_ADD(
start_time,
INTERVAL CAST(bytes_total / bytes_complete * seconds_complete AS INT64) SECOND
) AS estimate_by_bytes,
TIMESTAMP_ADD(
start_time,
INTERVAL CAST(tasks_total / tasks_complete * seconds_complete AS INT64) SECOND
) AS estimate_by_tasks
)
) AS completion_time,
slot_ms_total,
bytes_total / POW(2, 50) AS petabytes_total,
tasks_total,
*
FROM
`moz-fx-data-shared-prod.monitoring_derived.shredder_progress`
`moz-fx-data-shared-prod.monitoring_derived.shredder_progress_v1`

Просмотреть файл

@ -1,20 +1,32 @@
CREATE OR REPLACE VIEW
`moz-fx-data-shared-prod.monitoring.shredder_progress`
`moz-fx-data-shared-prod.monitoring_derived.shredder_progress_v1`
AS
WITH max_end_date AS (
SELECT
MAX(end_date) AS end_date
FROM
`moz-fx-data-shredder.shredder_state.shredder_state`
),
shredder_state AS (
WITH shredder AS (
SELECT
task_id,
-- NEWEST job for each task_id
CASE
WHEN
target = "moz-fx-data-shared-prod.telemetry_stable.main_v4"
THEN
"on_demand"
WHEN
target = "moz-fx-data-shared-prod.telemetry_derived.main_summary_v4"
THEN
"flat_rate_main_summary"
ELSE
"flat_rate"
END
AS airflow_task_id,
target,
end_date,
-- oldest table size
ARRAY_AGG(STRUCT(target_bytes, source_bytes) ORDER BY _PARTITIONTIME LIMIT 1)[OFFSET(0)].*,
-- newest job
ARRAY_AGG(
STRUCT(
start_date,
end_date,
-- job metadata over 28 days old is not queried
job_created <= TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 28 DAY) AS job_too_old,
job_created,
SPLIT(job_id, ":")[offset(0)] AS project_id,
SPLIT(job_id, ".")[offset(1)] AS job_id
)
@ -23,35 +35,16 @@ shredder_state AS (
LIMIT
1
)[OFFSET(0)].*
FROM
`moz-fx-data-shredder.shredder_state.shredder_state`
-- only for the most recent shredder run
JOIN
max_end_date
USING
(end_date)
GROUP BY
task_id
),
tasks AS (
SELECT
task_id,
ARRAY_AGG(
STRUCT(start_date, end_date, target, target_bytes, source_bytes)
-- already filtered on max_end_date, so use OLDEST metadata
ORDER BY
_PARTITIONTIME
LIMIT
1
)[OFFSET(0)].*
FROM
`moz-fx-data-shredder.shredder_state.tasks`
JOIN
max_end_date
LEFT JOIN
`moz-fx-data-shredder.shredder_state.shredder_state`
USING
(end_date)
(task_id, end_date)
GROUP BY
task_id
task_id,
target, -- every task has exactly one target
end_date
),
jobs AS (
-- https://cloud.google.com/bigquery/docs/information-schema-jobs
@ -65,19 +58,13 @@ jobs AS (
FROM
`moz-fx-data-bq-batch-prod.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
),
progress_by_task AS (
successful_jobs AS (
SELECT
task_id,
start_date,
end_date,
*,
total_bytes_processed AS bytes_complete,
total_slot_ms AS slot_ms,
FROM
shredder_state
JOIN
jobs
USING
(project_id, job_id)
WHERE
creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 28 DAY)
AND state = "DONE"
@ -85,64 +72,122 @@ progress_by_task AS (
),
progress_by_target AS (
SELECT
MAX(end_date) AS end_date,
COUNT(progress_by_task.task_id) AS tasks_complete,
airflow_task_id,
end_date,
MIN(IFNULL(start_time, job_created)) AS start_time,
MAX(end_time) AS end_time,
-- assume jobs too old for metadata are complete
LOGICAL_AND(job_too_old IS TRUE OR end_time IS NOT NULL) AS complete,
COUNTIF(job_too_old IS TRUE OR end_time IS NOT NULL) AS tasks_complete,
COUNT(*) AS tasks_total,
SUM(bytes_complete) AS bytes_complete,
-- don't estimate total bytes if all tasks are complete
IF(
COUNT(progress_by_task.task_id) = COUNT(*),
SUM(bytes_complete),
-- count target_bytes once per table and source_bytes once per task
MIN(target_bytes) + SUM(source_bytes)
) AS bytes_total,
-- count target_bytes once per table and source_bytes once per task
MIN(target_bytes) + SUM(source_bytes) AS bytes_total,
SUM(slot_ms) AS slot_ms,
NULLIF(SUM(bytes_complete), 0) / SUM(slot_ms) AS bytes_per_slot_ms,
FROM
tasks
shredder
LEFT JOIN
progress_by_task
successful_jobs
USING
(task_id, start_date, end_date)
(project_id, job_id)
GROUP BY
target
target,
airflow_task_id,
end_date
),
avg_bytes_per_slot_ms AS (
SELECT AS VALUE
SELECT
airflow_task_id,
end_date,
-- avg weighted by bytes_total
SUM(bytes_per_slot_ms * bytes_total) / SUM(IF(bytes_per_slot_ms IS NOT NULL, bytes_total, NULL))
SUM(bytes_per_slot_ms * bytes_total) / SUM(bytes_total) AS avg_bytes_per_slot_ms,
FROM
progress_by_target
WHERE
bytes_per_slot_ms IS NOT NULL
GROUP BY
airflow_task_id,
end_date
),
progress AS (
SELECT
airflow_task_id,
end_date,
LOGICAL_AND(complete) AS complete,
SUM(slot_ms) AS slot_ms_complete,
SUM(slot_ms) + SUM(
-- estimate time remaining using bytes_per_slot_ms per table where available
(bytes_total - IFNULL(bytes_complete, 0)) / IFNULL(bytes_per_slot_ms, avg_bytes_per_slot_ms)
) AS slot_ms_total,
-- tasks start 1 day after end_date
-- estimate slot_ms remaining using bytes_per_slot_ms per table where available
SUM(bytes_total / IFNULL(bytes_per_slot_ms, avg_bytes_per_slot_ms)) AS slot_ms_total,
-- diff in seconds to include fractional days
MIN(start_time) AS start_time,
TIMESTAMP_DIFF(
current_timestamp,
TIMESTAMP(DATE_ADD(MAX(end_date), INTERVAL 1 DAY)),
MINUTE
) / 60 / 24 AS days_complete,
COALESCE(
MAX(end_time),
-- start_time of the next airflow run when complete
ANY_VALUE(MIN(start_time)) OVER (
PARTITION BY
airflow_task_id
ORDER BY
end_date
ROWS BETWEEN
1 FOLLOWING
AND 1 FOLLOWING
),
-- current time for the last airflow run
CURRENT_TIMESTAMP
),
MIN(start_time),
SECOND
) AS seconds_complete,
SUM(bytes_complete) AS bytes_complete,
SUM(bytes_total) AS bytes_total,
SUM(tasks_complete) AS tasks_complete,
SUM(tasks_total) AS tasks_total,
FROM
progress_by_target
CROSS JOIN
avg_bytes_per_slot_ms
LEFT JOIN
avg_bytes_per_slot_ms AS _
USING
(airflow_task_id, end_date)
GROUP BY
airflow_task_id,
end_date
)
SELECT
slot_ms_complete / slot_ms_total * 100 AS days_percent_complete,
bytes_complete / bytes_total * 100 AS bytes_percent_complete,
tasks_complete / tasks_total * 100 AS tasks_percent_complete,
slot_ms_total / slot_ms_complete * days_complete AS days_total,
bytes_total / 1024 / 1024 / 1024 / 1024 / 1024 AS petabytes_total,
STRUCT(airflow_task_id AS task_id, end_date) AS airflow,
start_time,
complete,
IF(
complete,
STRUCT(
TIMESTAMP_ADD(start_time, INTERVAL seconds_complete SECOND) AS actual,
NULL AS estimate_by_slot_ms,
NULL AS estimate_by_bytes,
NULL AS estimate_by_tasks
),
STRUCT(
NULL AS actual,
TIMESTAMP_ADD(
start_time,
INTERVAL CAST(
IFNULL(slot_ms_total / slot_ms_complete, 1) * seconds_complete AS INT64
) SECOND
) AS estimate_by_slot_ms,
TIMESTAMP_ADD(
start_time,
INTERVAL CAST(bytes_total / bytes_complete * seconds_complete AS INT64) SECOND
) AS estimate_by_bytes,
TIMESTAMP_ADD(
start_time,
INTERVAL CAST(tasks_total / tasks_complete * seconds_complete AS INT64) SECOND
) AS estimate_by_tasks
)
) AS completion_time,
slot_ms_total,
bytes_total / POW(2, 50) AS petabytes_total,
tasks_total,
FROM
progress
ORDER BY
end_date DESC,
airflow_task_id