Bug 1596917 - Skip reading credentials file if not exists
This commit is contained in:
Родитель
52ecf7755e
Коммит
b497a47006
|
@ -21,8 +21,8 @@ def entry_point():
|
|||
@click.option(
|
||||
"--credentials-protocol", type=click.Choice(["file", "s3", "gcs"]), default="s3"
|
||||
)
|
||||
@click.option("--credentials-bucket", type=str, required=True)
|
||||
@click.option("--credentials-prefix", type=str, required=True)
|
||||
@click.option("--credentials-bucket", type=str, required=False)
|
||||
@click.option("--credentials-prefix", type=str, required=False)
|
||||
@click.option("--num-partitions", type=int, default=10000)
|
||||
@click.option(
|
||||
"--source",
|
||||
|
@ -49,18 +49,20 @@ def run_aggregator(
|
|||
spark = SparkSession.builder.getOrCreate()
|
||||
|
||||
# Mozaggregator expects a series of POSTGRES_* variables in order to connect
|
||||
# to a db instance; we pull them into the environment now by reading an
|
||||
# to a db instance; we may pull them into the environment now by reading an
|
||||
# object from a file system.
|
||||
def create_path(protocol, bucket, prefix):
|
||||
mapping = {"file": "file", "s3": "s3a", "gcs": "gs"}
|
||||
return "{protocol}://{bucket}/{prefix}".format(
|
||||
protocol=mapping[protocol], bucket=bucket, prefix=prefix
|
||||
)
|
||||
return f"{mapping[protocol]}://{bucket}/{prefix}"
|
||||
|
||||
path = create_path(credentials_protocol, credentials_bucket, credentials_prefix)
|
||||
creds = spark.read.json(path, multiLine=True).first().asDict()
|
||||
for k, v in creds.items():
|
||||
environ[k] = v
|
||||
if credentials_bucket and credentials_prefix:
|
||||
path = create_path(credentials_protocol, credentials_bucket, credentials_prefix)
|
||||
print(f"reading credentials from {path}")
|
||||
creds = spark.read.json(path, multiLine=True).first().asDict()
|
||||
for k, v in creds.items():
|
||||
environ[k] = v
|
||||
else:
|
||||
print(f"assuming credentials from the environment")
|
||||
|
||||
# Attempt a database connection now so we can fail fast if credentials are broken.
|
||||
db._preparedb()
|
||||
|
@ -133,14 +135,18 @@ def run_parquet(
|
|||
)
|
||||
@click.option("--num-partitions", type=int, default=10000)
|
||||
@click.option(
|
||||
"--source", type=click.Choice(["bigquery", "moztelemetry", "avro"]), default="moztelemetry"
|
||||
"--source",
|
||||
type=click.Choice(["bigquery", "moztelemetry", "avro"]),
|
||||
default="moztelemetry",
|
||||
)
|
||||
@click.option(
|
||||
"--project-id", envvar="PROJECT_ID", type=str, default="moz-fx-data-shared-prod"
|
||||
)
|
||||
@click.option("--dataset-id", type=str, default="payload_bytes_decoded")
|
||||
@click.option("--avro-prefix", type=str)
|
||||
def run_mobile(date, output, num_partitions, source, project_id, dataset_id, avro_prefix):
|
||||
def run_mobile(
|
||||
date, output, num_partitions, source, project_id, dataset_id, avro_prefix
|
||||
):
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче