feat: [DENG-3096] update to correct gcp conn id (#1982)
This commit is contained in:
Родитель
d686559bcf
Коммит
9318abeabf
|
@ -5,6 +5,7 @@ from datetime import datetime, timedelta
|
|||
import pandas as pd
|
||||
import requests
|
||||
from airflow import DAG
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.models import Variable
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
|
@ -41,6 +42,7 @@ TAGS = [Tag.ImpactTier.tier_3, Tag.Repo.airflow]
|
|||
brwsr_usg_configs = {
|
||||
"timeout_limit": 2000,
|
||||
"device_types": ["DESKTOP", "MOBILE", "OTHER", "ALL"],
|
||||
"max_limit": 20,
|
||||
"operating_systems": [
|
||||
"ALL",
|
||||
"WINDOWS",
|
||||
|
@ -85,27 +87,33 @@ brwsr_usg_configs = {
|
|||
"SE",
|
||||
"GR",
|
||||
],
|
||||
"user_types": ["ALL"],
|
||||
"user_types": ["ALL", "LIKELY_AUTOMATED", "LIKELY_HUMAN"],
|
||||
"bucket": "gs://moz-fx-data-prod-external-data/",
|
||||
"results_stg_gcs_fpth": "cloudflare/browser_usage/RESULTS_STAGING/%s_results.csv",
|
||||
"results_archive_gcs_fpath": "cloudflare/browser_usage/RESULTS_ARCHIVE/%s_results.csv",
|
||||
"errors_stg_gcs_fpth": "cloudflare/browser_usage/ERRORS_STAGING/%s_errors.csv",
|
||||
"errors_archive_gcs_fpath": "cloudflare/browser_usage/ERRORS_ARCHIVE/%s_errors.csv",
|
||||
"gcp_project_id": "moz-fx-data-shared-prod",
|
||||
"gcp_conn_id": "google_cloud_airflow_dataproc",
|
||||
"gcp_conn_id": "google_cloud_shared_prod",
|
||||
}
|
||||
|
||||
# Get the connection from the gcp_conn_id to use in storage_options
|
||||
gcp_conn_dtl = BaseHook.get_connection(brwsr_usg_configs["gcp_conn_id"])
|
||||
extra_json_string = gcp_conn_dtl.get_extra()
|
||||
extra_dict = json.loads(extra_json_string)
|
||||
gcp_strg_tkn = extra_dict["extra__google_cloud_platform__keyfile_dict"]
|
||||
|
||||
|
||||
# Function to generate API call based on configs passed
|
||||
def generate_browser_api_call(
|
||||
strt_dt, end_dt, device_type, location, op_system, user_typ
|
||||
strt_dt, end_dt, device_type, location, op_system, user_typ, limit
|
||||
):
|
||||
"""Create the API url based on the input parameters."""
|
||||
user_type_string = "" if user_typ == "ALL" else f"&botClass={user_typ}"
|
||||
location_string = "" if location == "ALL" else f"&location={location}"
|
||||
op_system_string = "" if op_system == "ALL" else f"&os={op_system}"
|
||||
device_type_string = "" if device_type == "ALL" else f"&deviceType={device_type}"
|
||||
browser_api_url = f"https://api.cloudflare.com/client/v4/radar/http/top/browsers?dateStart={strt_dt}T00:00:00.000Z&dateEnd={end_dt}T00:00:00.000Z{device_type_string}{location_string}{op_system_string}{user_type_string}&format=json"
|
||||
browser_api_url = f"https://api.cloudflare.com/client/v4/radar/http/top/browsers?dateStart={strt_dt}T00:00:00.000Z&dateEnd={end_dt}T00:00:00.000Z{device_type_string}{location_string}{op_system_string}{user_type_string}&limit={limit}&format=json"
|
||||
return browser_api_url
|
||||
|
||||
|
||||
|
@ -123,6 +131,7 @@ def get_browser_data(**kwargs):
|
|||
# Configure request headers
|
||||
bearer_string = f"Bearer {auth_token}"
|
||||
headers = {"Authorization": bearer_string}
|
||||
limit = brwsr_usg_configs["max_limit"]
|
||||
|
||||
# Initialize the empty results and errors dataframes
|
||||
browser_results_df = pd.DataFrame(
|
||||
|
@ -157,12 +166,12 @@ def get_browser_data(**kwargs):
|
|||
for loc in brwsr_usg_configs["locations"]:
|
||||
for os in brwsr_usg_configs["operating_systems"]:
|
||||
for user_type in brwsr_usg_configs["user_types"]:
|
||||
curr_combo = f"Device Type: {device_type}, Location: {loc}, OS: {os}, User Type: {user_type}"
|
||||
curr_combo = f"DeviceType: {device_type}, Loc: {loc}, OS: {os}, UserType: {user_type}, Limit: {limit}"
|
||||
print(curr_combo)
|
||||
|
||||
# Generate the URL & call the API
|
||||
brwsr_usg_api_url = generate_browser_api_call(
|
||||
start_date, end_date, device_type, loc, os, user_type
|
||||
start_date, end_date, device_type, loc, os, user_type, limit
|
||||
)
|
||||
response = requests.get(
|
||||
brwsr_usg_api_url,
|
||||
|
@ -232,8 +241,12 @@ def get_browser_data(**kwargs):
|
|||
brwsr_usg_configs["bucket"]
|
||||
+ brwsr_usg_configs["errors_stg_gcs_fpth"] % logical_dag_dt
|
||||
)
|
||||
browser_results_df.to_csv(result_fpath, index=False)
|
||||
browser_errors_df.to_csv(error_fpath, index=False)
|
||||
browser_results_df.to_csv(
|
||||
result_fpath, index=False, storage_options={"token": gcp_strg_tkn}
|
||||
)
|
||||
browser_errors_df.to_csv(
|
||||
error_fpath, index=False, storage_options={"token": gcp_strg_tkn}
|
||||
)
|
||||
print("Wrote errors to: ", error_fpath)
|
||||
print("Wrote results to: ", result_fpath)
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ from datetime import datetime, timedelta
|
|||
import pandas as pd
|
||||
import requests
|
||||
from airflow import DAG
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.models import Variable
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
|
@ -80,9 +81,14 @@ device_usg_configs = {
|
|||
"errors_stg_gcs_fpth": "gs://moz-fx-data-prod-external-data/cloudflare/device_usage/ERRORS_STAGING/%s_errors.csv",
|
||||
"errors_archive_gcs_fpath": "gs://moz-fx-data-prod-external-data/cloudflare/device_usage/ERRORS_ARCHIVE/%s_errors.csv",
|
||||
"gcp_project_id": "moz-fx-data-shared-prod",
|
||||
"gcp_conn_id": "google_cloud_airflow_dataproc",
|
||||
"gcp_conn_id": "google_cloud_shared_prod",
|
||||
}
|
||||
|
||||
gcp_conn_dtl = BaseHook.get_connection(device_usg_configs["gcp_conn_id"])
|
||||
extra_json_string = gcp_conn_dtl.get_extra()
|
||||
extra_dict = json.loads(extra_json_string)
|
||||
gcp_strg_tkn = extra_dict["extra__google_cloud_platform__keyfile_dict"]
|
||||
|
||||
|
||||
def generate_device_type_timeseries_api_call(strt_dt, end_dt, agg_int, location):
|
||||
"""Calculate API to call based on given parameters."""
|
||||
|
@ -255,8 +261,10 @@ def get_device_usage_data(**kwargs):
|
|||
device_usg_configs["bucket"]
|
||||
+ device_usg_configs["errors_stg_gcs_fpth"] % logical_dag_dt
|
||||
)
|
||||
results_df.to_csv(result_fpath, index=False)
|
||||
errors_df.to_csv(error_fpath, index=False)
|
||||
results_df.to_csv(
|
||||
result_fpath, index=False, storage_options={"token": gcp_strg_tkn}
|
||||
)
|
||||
errors_df.to_csv(error_fpath, index=False, storage_options={"token": gcp_strg_tkn})
|
||||
print("Wrote errors to: ", error_fpath)
|
||||
print("Wrote results to: ", result_fpath)
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ from datetime import datetime, timedelta
|
|||
import pandas as pd
|
||||
import requests
|
||||
from airflow import DAG
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.models import Variable
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
|
@ -81,9 +82,15 @@ os_usg_configs = {
|
|||
"errors_stg_gcs_fpth": "gs://moz-fx-data-prod-external-data/cloudflare/os_usage/ERRORS_STAGING/%s_errors.csv",
|
||||
"errors_archive_gcs_fpth": "gs://moz-fx-data-prod-external-data/cloudflare/os_usage/ERRORS_ARCHIVE/%s_errors.csv",
|
||||
"gcp_project_id": "moz-fx-data-shared-prod",
|
||||
"gcp_conn_id": "google_cloud_airflow_dataproc",
|
||||
"gcp_conn_id": "google_cloud_shared_prod",
|
||||
}
|
||||
|
||||
# Get the connection from the gcp_conn_id to use in storage_options
|
||||
gcp_conn_dtl = BaseHook.get_connection(os_usg_configs["gcp_conn_id"])
|
||||
extra_json_string = gcp_conn_dtl.get_extra()
|
||||
extra_dict = json.loads(extra_json_string)
|
||||
gcp_strg_tkn = extra_dict["extra__google_cloud_platform__keyfile_dict"]
|
||||
|
||||
|
||||
# Function to configure the API URL
|
||||
def generate_os_timeseries_api_call(strt_dt, end_dt, agg_int, location, device_type):
|
||||
|
@ -200,8 +207,8 @@ def get_os_usage_data(**kwargs):
|
|||
+ os_usg_configs["errors_stg_gcs_fpth"] % logical_dag_dt
|
||||
)
|
||||
|
||||
result_df.to_csv(result_fpath, index=False)
|
||||
errors_df.to_csv(errors_fpath, index=False)
|
||||
result_df.to_csv(result_fpath, index=False, storage_options={"token": gcp_strg_tkn})
|
||||
errors_df.to_csv(errors_fpath, index=False, storage_options={"token": gcp_strg_tkn})
|
||||
print("Wrote errors to: ", errors_fpath)
|
||||
print("Wrote results to: ", result_fpath)
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче