Add destination dataset name to temp table name (fixes #11) (#13)

This commit is contained in:
Ben Wu 2019-12-12 12:36:49 -05:00 коммит произвёл GitHub
Родитель 4bdbd23b68
Коммит ea4ed32763
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 32 добавлений и 19 удалений

Просмотреть файл

@ -31,11 +31,11 @@ class LeanplumExporter(object):
job_id = self.init_export(date, export_format)
file_uris = self.get_files(job_id)
tables = self.save_files(file_uris, bucket, prefix, date, export_format, version)
self.create_external_tables(bucket, prefix, date, tables,
self.TMP_DATASET, table_prefix, version)
self.create_external_tables(bucket, prefix, date, tables, self.TMP_DATASET,
dataset, table_prefix, version)
self.delete_existing_data(dataset, table_prefix, tables, version, date)
self.load_tables(self.TMP_DATASET, dataset, table_prefix, tables, version, date)
self.drop_external_tables(self.TMP_DATASET, table_prefix, tables, version, date)
self.drop_external_tables(self.TMP_DATASET, dataset, table_prefix, tables, version, date)
def init_export(self, date, export_format):
export_init_url = (f"http://www.leanplum.com/api"
@ -132,13 +132,13 @@ class LeanplumExporter(object):
bucket.delete_blobs(list(page))
def create_external_tables(self, bucket_name, prefix, date, tables,
dataset, table_prefix, version):
ext_dataset, dataset, table_prefix, version):
gcs_loc = f"gs://{bucket_name}/{prefix}/v{version}/{date}"
dataset_ref = self.bq_client.dataset(dataset)
dataset_ref = self.bq_client.dataset(ext_dataset)
for leanplum_name in tables:
table_name = self.get_table_name(table_prefix, leanplum_name, version, date)
logging.info(f"Creating external table {dataset}.{table_name}")
table_name = self.get_table_name(table_prefix, leanplum_name, version, date, dataset)
logging.info(f"Creating external table {ext_dataset}.{table_name}")
table_ref = bigquery.TableReference(dataset_ref, table_name)
table = bigquery.Table(table_ref)
@ -169,7 +169,7 @@ class LeanplumExporter(object):
destination_dataset = self.bq_client.dataset(dataset)
for table in tables:
ext_table_name = self.get_table_name(table_prefix, table, version, date)
ext_table_name = self.get_table_name(table_prefix, table, version, date, dataset)
table_name = self.get_table_name(table_prefix, table, version)
destination_table = bigquery.TableReference(destination_dataset, table_name)
@ -198,11 +198,11 @@ class LeanplumExporter(object):
job = self.bq_client.query(sql)
job.result()
def drop_external_tables(self, ext_dataset, table_prefix, tables, version, date):
def drop_external_tables(self, ext_dataset, dataset, table_prefix, tables, version, date):
dataset_ref = self.bq_client.dataset(ext_dataset)
for leanplum_name in tables:
table_name = self.get_table_name(table_prefix, leanplum_name, version, date)
table_name = self.get_table_name(table_prefix, leanplum_name, version, date, dataset)
table_ref = bigquery.TableReference(dataset_ref, table_name)
table = bigquery.Table(table_ref)
@ -217,13 +217,15 @@ class LeanplumExporter(object):
except exceptions.NotFound:
return False
def get_table_name(self, table_prefix, leanplum_name, version, date=None):
def get_table_name(self, table_prefix, leanplum_name, version, date=None, dataset_prefix=None):
if table_prefix:
table_prefix += "_"
else:
table_prefix = ""
name = f"{table_prefix}{leanplum_name}_v{version}"
if dataset_prefix is not None:
name = f"{dataset_prefix}_{name}"
if date is not None:
name += f"_{date}"

Просмотреть файл

@ -280,7 +280,10 @@ class TestExporter(object):
mock_bq_client.dataset.assert_any_call(tmp_dataset)
mock_bq_client.delete_table.assert_any_call(mock_table, not_found_ok=True)
MockBq.TableReference.assert_any_call(mock_dataset_ref, f"sessions_v1_{date}")
MockBq.TableReference.assert_any_call(
mock_dataset_ref,
f"{dataset_name}_sessions_v1_{date}"
)
MockBq.Table.assert_any_call(mock_table_ref)
MockBq.ExternalConfig.assert_any_call("CSV")
@ -301,7 +304,8 @@ class TestExporter(object):
expected_query = (
f"CREATE TABLE `{dataset_name}.sessions_v1` "
f"PARTITION BY load_date AS SELECT * EXCEPT (lat,lon), "
f"PARSE_DATE('%Y%m%d', '{date}') AS load_date FROM `tmp.sessions_v1_{date}`")
f"PARSE_DATE('%Y%m%d', '{date}') AS load_date "
f"FROM `tmp.{dataset_name}_sessions_v1_{date}`")
mock_bq_client.query.assert_any_call(expected_query)
mock_bq_client.delete_table.assert_any_call(mock_table)
@ -371,7 +375,10 @@ class TestExporter(object):
mock_bq_client.dataset.assert_any_call(tmp_dataset)
mock_bq_client.delete_table.assert_any_call(mock_table, not_found_ok=True)
MockBq.TableReference.assert_any_call(mock_dataset_ref, f"sessions_v1_{date}")
MockBq.TableReference.assert_any_call(
mock_dataset_ref,
f"{dataset_name}_sessions_v1_{date}"
)
MockBq.Table.assert_any_call(mock_table_ref)
MockBq.ExternalConfig.assert_any_call("CSV")
@ -391,7 +398,8 @@ class TestExporter(object):
insert_query = (
f"INSERT INTO `{dataset_name}.sessions_v1` SELECT * EXCEPT (lat,lon), "
f"PARSE_DATE('%Y%m%d', '{date}') AS load_date FROM `tmp.sessions_v1_{date}`")
f"PARSE_DATE('%Y%m%d', '{date}') AS load_date "
f"FROM `tmp.{dataset_name}_sessions_v1_{date}`")
mock_bq_client.query.assert_any_call(insert_query)
mock_bq_client.delete_table.assert_any_call(mock_table)
@ -425,6 +433,7 @@ class TestExporter(object):
date = "20190101"
bucket = 'abucket'
prefix = 'aprefix'
ext_dataset_name = "ext_dataset"
dataset_name = "leanplum_dataset"
tables = ["sessions"]
table_prefix = "prefix"
@ -440,12 +449,14 @@ class TestExporter(object):
exporter.bq_client = mock_bq_client
exporter.create_external_tables(
bucket, prefix, date, tables, dataset_name, table_prefix, 1)
bucket, prefix, date, tables, ext_dataset_name, dataset_name, table_prefix, 1)
mock_bq_client.dataset.assert_any_call(dataset_name)
mock_bq_client.dataset.assert_any_call(ext_dataset_name)
mock_bq_client.delete_table.assert_called_with(mock_table, not_found_ok=True)
MockBq.TableReference.assert_any_call(mock_dataset_ref,
f"{table_prefix}_sessions_v1_{date}")
MockBq.TableReference.assert_any_call(
mock_dataset_ref,
f"{dataset_name}_{table_prefix}_sessions_v1_{date}"
)
MockBq.Table.assert_any_call(mock_table_ref)
MockBq.ExternalConfig.assert_any_call("CSV")