Remove asn_aggregates ETL (#2580)
This commit is contained in:
Родитель
ec7e68f213
Коммит
6cac92056b
|
@ -2,6 +2,14 @@
|
|||
|
||||
This document records interesting code that we've deleted for the sake of discoverability for the future.
|
||||
|
||||
## 2021-12 ASN aggregates
|
||||
|
||||
- [Removal PR](https://github.com/mozilla/bigquery-etl/pull/2580)
|
||||
- [Bug](https://mozilla-hub.atlassian.net/browse/DSRE-197)
|
||||
|
||||
This dataset was no longer being actively used, and removing it allows us to
|
||||
limit airflow's access to `payload_bytes_raw`.
|
||||
|
||||
## 2021-09 Remove document sampling queries
|
||||
|
||||
- [Removal PR](https://github.com/mozilla/bigquery-etl/pull/2389)
|
||||
|
|
|
@ -57,7 +57,7 @@ def query():
|
|||
|
||||
@query.command(
|
||||
help="""Create a new query with name
|
||||
<dataset>.<query_name>, for example: telemetry_derived.asn_aggregates.
|
||||
<dataset>.<query_name>, for example: telemetry_derived.active_profiles.
|
||||
Use the `--project_id` option to change the project the query is added to;
|
||||
default is `moz-fx-data-shared-prod`. Views are automatically generated
|
||||
in the publicly facing dataset.
|
||||
|
|
|
@ -31,7 +31,7 @@ def view():
|
|||
|
||||
@view.command(
|
||||
help="""Create a new view with name
|
||||
<dataset>.<view_name>, for example: telemetry_derived.asn_aggregates.
|
||||
<dataset>.<view_name>, for example: telemetry_derived.active_profiles.
|
||||
Use the `--project_id` option to change the project the view is added to;
|
||||
default is `moz-fx-data-shared-prod`.
|
||||
|
||||
|
|
|
@ -202,7 +202,6 @@ SKIP = {
|
|||
"sql/moz-fx-data-shared-prod/telemetry_derived/glam_client_probe_counts_extract_v1/query.sql", # noqa E501
|
||||
"sql/moz-fx-data-shared-prod/telemetry_derived/scalar_percentiles_v1/query.sql",
|
||||
"sql/moz-fx-data-shared-prod/telemetry_derived/clients_scalar_probe_counts_v1/query.sql", # noqa E501
|
||||
"sql/moz-fx-data-shared-prod/telemetry_derived/asn_aggregates_v1/query.sql",
|
||||
# Dataset sql/glam-fenix-dev:glam_etl was not found
|
||||
*glob.glob("sql/glam-fenix-dev/glam_etl/**/*.sql", recursive=True),
|
||||
# Query templates
|
||||
|
|
10
dags.yaml
10
dags.yaml
|
@ -242,16 +242,6 @@ bqetl_experiments_daily:
|
|||
retries: 2
|
||||
retry_delay: 30m
|
||||
|
||||
bqetl_asn_aggregates:
|
||||
schedule_interval: 0 2 * * *
|
||||
description: The DAG schedules ASN aggregates queries.
|
||||
default_args:
|
||||
owner: ascholtz@mozilla.com
|
||||
start_date: "2020-04-05"
|
||||
email: ["ascholtz@mozilla.com", "tdsmith@mozilla.com"]
|
||||
retries: 2
|
||||
retry_delay: 30m
|
||||
|
||||
# DAG for exporting query data marked as public to GCS
|
||||
# queries should not be explicitly assigned to this DAG (done automatically)
|
||||
bqetl_public_data_json:
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
|
||||
|
||||
from airflow import DAG
|
||||
from operators.task_sensor import ExternalTaskCompletedSensor
|
||||
import datetime
|
||||
from utils.gcp import bigquery_etl_query, gke_command
|
||||
|
||||
docs = """
|
||||
### bqetl_asn_aggregates
|
||||
|
||||
Built from bigquery-etl repo, [`dags/bqetl_asn_aggregates.py`](https://github.com/mozilla/bigquery-etl/blob/main/dags/bqetl_asn_aggregates.py)
|
||||
|
||||
#### Description
|
||||
|
||||
The DAG schedules ASN aggregates queries.
|
||||
#### Owner
|
||||
|
||||
ascholtz@mozilla.com
|
||||
"""
|
||||
|
||||
|
||||
default_args = {
|
||||
"owner": "ascholtz@mozilla.com",
|
||||
"start_date": datetime.datetime(2020, 4, 5, 0, 0),
|
||||
"end_date": None,
|
||||
"email": ["ascholtz@mozilla.com", "tdsmith@mozilla.com"],
|
||||
"depends_on_past": False,
|
||||
"retry_delay": datetime.timedelta(seconds=1800),
|
||||
"email_on_failure": True,
|
||||
"email_on_retry": True,
|
||||
"retries": 2,
|
||||
}
|
||||
|
||||
with DAG(
|
||||
"bqetl_asn_aggregates",
|
||||
default_args=default_args,
|
||||
schedule_interval="0 2 * * *",
|
||||
doc_md=docs,
|
||||
) as dag:
|
||||
|
||||
telemetry_derived__asn_aggregates__v1 = bigquery_etl_query(
|
||||
task_id="telemetry_derived__asn_aggregates__v1",
|
||||
destination_table="asn_aggregates_v1",
|
||||
dataset_id="telemetry_derived",
|
||||
project_id="moz-fx-data-shared-prod",
|
||||
owner="tdsmith@mozilla.com",
|
||||
email=["ascholtz@mozilla.com", "tdsmith@mozilla.com"],
|
||||
date_partition_parameter="submission_date",
|
||||
depends_on_past=False,
|
||||
parameters=["n_clients:INT64:500"],
|
||||
dag=dag,
|
||||
)
|
||||
|
||||
wait_for_bq_main_events = ExternalTaskCompletedSensor(
|
||||
task_id="wait_for_bq_main_events",
|
||||
external_dag_id="copy_deduplicate",
|
||||
external_task_id="bq_main_events",
|
||||
execution_delta=datetime.timedelta(seconds=3600),
|
||||
check_existence=True,
|
||||
mode="reschedule",
|
||||
pool="DATA_ENG_EXTERNALTASKSENSOR",
|
||||
)
|
||||
|
||||
telemetry_derived__asn_aggregates__v1.set_upstream(wait_for_bq_main_events)
|
|
@ -1,7 +0,0 @@
|
|||
CREATE OR REPLACE VIEW
|
||||
`moz-fx-data-shared-prod.telemetry.asn_aggregates`
|
||||
AS
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.telemetry_derived.asn_aggregates_v1`
|
|
@ -1,21 +0,0 @@
|
|||
---
|
||||
friendly_name: ASN Aggregates
|
||||
description: |-
|
||||
A daily aggregate of clients per ASN (Autonomous System Number) and
|
||||
DoH (DNS over HTTPS) usage, partitioned by day. Only ASNs with more
|
||||
than a critical threshold of clients are considered.
|
||||
owners:
|
||||
- tdsmith@mozilla.com
|
||||
- ascholtz@mozilla.com
|
||||
labels:
|
||||
incremental: true
|
||||
schedule: daily
|
||||
scheduling:
|
||||
dag_name: bqetl_asn_aggregates
|
||||
parameters: ["n_clients:INT64:500"]
|
||||
referenced_tables:
|
||||
- ["moz-fx-data-shared-prod", "static", "isp_blocks_ipv4_20200407"]
|
||||
- ["moz-fx-data-shared-prod", "telemetry_derived", "events_v1"]
|
||||
- ["moz-fx-data-shared-prod", "telemetry_derived", "main_events_v1"]
|
||||
- ["moz-fx-data-shared-prod", "telemetry_derived", "events_events_v1"]
|
||||
- ["moz-fx-data-shared-prod", "payload_bytes_raw", "telemetry"]
|
|
@ -1,129 +0,0 @@
|
|||
|
||||
-- SELECT autonomous_system_number, network
|
||||
-- FROM `static.geoip2_isp_blocks_ipv4`
|
||||
-- WHERE NET.IP_TRUNC(NET.SAFE_IP_FROM_STRING("100.1.0.255"), CAST(SPLIT(network, "/")[OFFSET(1)] AS INT64)) = NET.SAFE_IP_FROM_STRING(SPLIT(network, "/")[OFFSET(0)])
|
||||
CREATE TEMPORARY FUNCTION get_client_ip(xff STRING, remote_address STRING, pipeline_proxy STRING)
|
||||
RETURNS STRING DETERMINISTIC
|
||||
LANGUAGE js
|
||||
AS
|
||||
"""
|
||||
// X-Forwarded-For is a list of IP addresses
|
||||
if (xff != null) {
|
||||
// Google's load balancer will append the immediate sending client IP and a global
|
||||
// forwarding rule IP to any existing content in X-Forwarded-For as documented in:
|
||||
// https://cloud.google.com/load-balancing/docs/https/#components
|
||||
//
|
||||
// In practice, many of the "first" addresses are bogus or internal,
|
||||
// so we target the immediate sending client IP.
|
||||
ips = xff.split(",");
|
||||
|
||||
if (pipeline_proxy != null) {
|
||||
// Drop extra IP from X-Forwarded-For
|
||||
ips = ips.slice(0, -1);
|
||||
}
|
||||
|
||||
ip = ips[Math.max(ips.length - 2, 0)].trim();
|
||||
} else {
|
||||
ip = "";
|
||||
if (remote_address != null) {
|
||||
ip = remote_address;
|
||||
}
|
||||
}
|
||||
|
||||
return ip;
|
||||
""";
|
||||
|
||||
--
|
||||
WITH asn_ip_address_range AS (
|
||||
SELECT
|
||||
NET.SAFE_IP_FROM_STRING(SPLIT(network, "/")[OFFSET(0)]) AS network_ip,
|
||||
CAST(SPLIT(network, "/")[OFFSET(1)] AS INT64) AS mask,
|
||||
autonomous_system_number
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.static.isp_blocks_ipv4_20200407`
|
||||
),
|
||||
events_with_doh AS (
|
||||
-- Get event data with DoH information.
|
||||
SELECT
|
||||
submission_date,
|
||||
document_id,
|
||||
client_id,
|
||||
event_object,
|
||||
event_category,
|
||||
udf.get_key(event_map_values, 'canary') AS canary,
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.telemetry.events` AS events
|
||||
WHERE
|
||||
submission_date = @submission_date
|
||||
),
|
||||
events_with_ip AS (
|
||||
-- Get IP addresses and client data.
|
||||
SELECT
|
||||
submission_date,
|
||||
client_id,
|
||||
canary,
|
||||
event_category,
|
||||
event_object,
|
||||
NET.SAFE_IP_FROM_STRING(ip_address) AS ip_address
|
||||
FROM
|
||||
events_with_doh AS events
|
||||
LEFT JOIN
|
||||
(
|
||||
SELECT
|
||||
submission_timestamp,
|
||||
get_client_ip(x_forwarded_for, remote_addr, x_pipeline_proxy) AS ip_address,
|
||||
udf.parse_desktop_telemetry_uri(uri).document_id AS document_id
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.payload_bytes_raw.telemetry`
|
||||
WHERE
|
||||
DATE(submission_timestamp) = @submission_date
|
||||
) AS payload_bytes_raw
|
||||
ON
|
||||
payload_bytes_raw.document_id = events.document_id
|
||||
),
|
||||
events_with_asn AS (
|
||||
-- Lookup ASNs for IP addresses.
|
||||
SELECT DISTINCT
|
||||
submission_date,
|
||||
client_id,
|
||||
canary,
|
||||
event_category,
|
||||
event_object,
|
||||
autonomous_system_number
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
*,
|
||||
ip_address & NET.IP_NET_MASK(4, mask) network_ip
|
||||
FROM
|
||||
events_with_ip,
|
||||
UNNEST(GENERATE_ARRAY(9, 32)) mask
|
||||
WHERE
|
||||
BYTE_LENGTH(ip_address) = 4
|
||||
)
|
||||
JOIN
|
||||
asn_ip_address_range
|
||||
USING
|
||||
(network_ip, mask)
|
||||
)
|
||||
SELECT
|
||||
submission_date,
|
||||
autonomous_system_number,
|
||||
COUNT(DISTINCT client_id) AS n_clients,
|
||||
COUNTIF(
|
||||
event_category LIKE 'doh'
|
||||
AND event_object LIKE 'heuristics'
|
||||
AND canary LIKE 'enable_doh'
|
||||
) AS doh_enabled,
|
||||
COUNTIF(
|
||||
event_category LIKE 'doh'
|
||||
AND event_object LIKE 'heuristics'
|
||||
AND canary LIKE 'disable_doh'
|
||||
) AS doh_disabled
|
||||
FROM
|
||||
events_with_asn
|
||||
GROUP BY
|
||||
submission_date,
|
||||
autonomous_system_number
|
||||
HAVING
|
||||
n_clients > @n_clients
|
Загрузка…
Ссылка в новой задаче