diff --git a/scripts/pg_dump_by_day b/scripts/pg_dump_by_day index 39e3d95..37dcf72 100755 --- a/scripts/pg_dump_by_day +++ b/scripts/pg_dump_by_day @@ -9,7 +9,10 @@ aggregate_type=${AGGREGATE_TYPE:-"submission_date"} ds_nodash=${DS_NODASH?Provide a data in YYYYMMDD format} output=$data_dir/$aggregate_type/$ds_nodash -if [ -d "$output" ]; then rm -r "$output"; fi +if [ -d "$output" ]; then + echo "dump already exists for $output" + exit +fi mkdir -p "$output" PGPASSWORD=$POSTGRES_PASS pg_dump \ diff --git a/scripts/pg_dump_to_parquet.py b/scripts/pg_dump_to_parquet.py index dd18a71..d817b22 100644 --- a/scripts/pg_dump_to_parquet.py +++ b/scripts/pg_dump_to_parquet.py @@ -47,6 +47,7 @@ AGGREGATE_SCHEMA = T.StringType() ) def main(input_dir, output_dir): spark = SparkSession.builder.getOrCreate() + spark.conf.set("spark.sql.session.timeZone", "UTC") df = read_pg_dump(spark, input_dir) @@ -55,6 +56,7 @@ def main(input_dir, output_dir): df.withColumn("parts", F.split("table_name", "_")) .withColumn("dimension", F.from_json("dimension", DIMENSION_SCHEMA)) .select( + F.current_date().alias("ingest_date"), "aggregate_type", "ds_nodash", # parts[:2] form aggregate_type, this is parsed from the filename