Add ingest_date to output parquet since aggregates change over time
This commit is contained in:
Родитель
46530e7138
Коммит
f9f653bec4
|
@ -9,7 +9,10 @@ aggregate_type=${AGGREGATE_TYPE:-"submission_date"}
|
|||
ds_nodash=${DS_NODASH?Provide a data in YYYYMMDD format}
|
||||
|
||||
output=$data_dir/$aggregate_type/$ds_nodash
|
||||
if [ -d "$output" ]; then rm -r "$output"; fi
|
||||
if [ -d "$output" ]; then
|
||||
echo "dump already exists for $output"
|
||||
exit
|
||||
fi
|
||||
mkdir -p "$output"
|
||||
|
||||
PGPASSWORD=$POSTGRES_PASS pg_dump \
|
||||
|
|
|
@ -47,6 +47,7 @@ AGGREGATE_SCHEMA = T.StringType()
|
|||
)
|
||||
def main(input_dir, output_dir):
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
spark.conf.set("spark.sql.session.timeZone", "UTC")
|
||||
|
||||
df = read_pg_dump(spark, input_dir)
|
||||
|
||||
|
@ -55,6 +56,7 @@ def main(input_dir, output_dir):
|
|||
df.withColumn("parts", F.split("table_name", "_"))
|
||||
.withColumn("dimension", F.from_json("dimension", DIMENSION_SCHEMA))
|
||||
.select(
|
||||
F.current_date().alias("ingest_date"),
|
||||
"aggregate_type",
|
||||
"ds_nodash",
|
||||
# parts[:2] form aggregate_type, this is parsed from the filename
|
||||
|
|
Загрузка…
Ссылка в новой задаче