Merge pull request #387 from mozilla/repoint_s3togcs_bhr
DENG 1022 - Update bhr_collection DAG to write to GCS instead of S3
This commit is contained in:
Коммит
dc0be49a84
|
@ -23,6 +23,7 @@ from boto3.s3.transfer import S3Transfer
|
|||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql.functions import array, collect_list
|
||||
from pyspark.sql.types import Row
|
||||
from google.cloud import storage
|
||||
|
||||
UNSYMBOLICATED = "<unsymbolicated>"
|
||||
SYMBOL_TRUNCATE_LENGTH = 200
|
||||
|
@ -299,7 +300,7 @@ class ProfileProcessor(object):
|
|||
root_stack[0] += hang_ms
|
||||
|
||||
last_stack = 0
|
||||
for (func_name, lib_name) in stack:
|
||||
for func_name, lib_name in stack:
|
||||
last_stack = prune_stack_cache.key_to_index(
|
||||
(func_name, lib_name, last_stack)
|
||||
)
|
||||
|
@ -326,14 +327,14 @@ class ProfileProcessor(object):
|
|||
prune_stack_cache = thread["pruneStackCache"]
|
||||
|
||||
last_annotation = None
|
||||
for (name, value) in annotations:
|
||||
for name, value in annotations:
|
||||
last_annotation = annotations_table.key_to_index(
|
||||
(last_annotation, name, value)
|
||||
)
|
||||
|
||||
last_stack = 0
|
||||
last_cache_item_index = 0
|
||||
for (func_name, lib_name) in stack:
|
||||
for func_name, lib_name in stack:
|
||||
cache_item_index = prune_stack_cache.key_to_index(
|
||||
(func_name, lib_name, last_cache_item_index)
|
||||
)
|
||||
|
@ -1104,6 +1105,7 @@ def read_file(name, config):
|
|||
return json.loads(f.read())
|
||||
|
||||
|
||||
# Retained s3 logic for backwards compatibility only due to the AWS to GCP migration
|
||||
def write_file(name, stuff, config):
|
||||
end_date = datetime.today()
|
||||
end_date_str = end_date.strftime("%Y%m%d")
|
||||
|
@ -1132,12 +1134,24 @@ def write_file(name, stuff, config):
|
|||
"bhr/data/hang_aggregates/" + name + "_" + config["uuid"] + ".json"
|
||||
)
|
||||
transfer.upload_file(gzfilename, bucket, s3_uuid_key, extra_args=extra_args)
|
||||
elif config["use_gcs"]:
|
||||
bucket_name = "moz-fx-data-static-websit-8565-analysis-output"
|
||||
gcs_key = "bhr/data/hang_aggregates/" + name + ".json"
|
||||
if config["uuid"] is not None:
|
||||
gcs_key = (
|
||||
"bhr/data/hang_aggregates/" + name + "_" + config["uuid"] + ".json"
|
||||
)
|
||||
storage_client = storage.Client()
|
||||
bucket = storage_client.bucket(bucket_name)
|
||||
blob = bucket.blob(gcs_key)
|
||||
blob.upload_from_filename(gzfilename)
|
||||
|
||||
|
||||
default_config = {
|
||||
"start_date": datetime.today() - timedelta(days=9),
|
||||
"end_date": datetime.today() - timedelta(days=1),
|
||||
"use_s3": True,
|
||||
"use_gcs": False,
|
||||
"sample_size": 0.50,
|
||||
"symbol_server_url": "https://symbols.mozilla.org/",
|
||||
"hang_profile_in_filename": "hang_profile_128_16000",
|
||||
|
@ -1224,7 +1238,8 @@ def etl_job_daily(sc, sql_context, config=None):
|
|||
default=0.5,
|
||||
help="Proportion of pings to use (1.0 is 100%)",
|
||||
)
|
||||
def start_job(date, sample_size):
|
||||
@click.option("--use_gcs", is_flag=True, default=False)
|
||||
def start_job(date, sample_size, use_gcs):
|
||||
print(f"Running for {date}")
|
||||
print(f"Using sample size {sample_size}")
|
||||
etl_job_daily(
|
||||
|
@ -1238,6 +1253,8 @@ def start_job(date, sample_size):
|
|||
"hang_lower_bound": 128,
|
||||
"hang_upper_bound": 65536,
|
||||
"sample_size": sample_size,
|
||||
"use_gcs": use_gcs,
|
||||
"use_s3": not use_gcs,
|
||||
},
|
||||
)
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче