Merge pull request #3 from mozilla/table_prefix

Table prefix
This commit is contained in:
Frank Bertsch 2019-10-18 06:43:11 -04:00 коммит произвёл GitHub
Родитель ee6f2c90c3 f26a10f45f
Коммит 48a9e2a808
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 86 добавлений и 36 удалений

Просмотреть файл

@ -12,6 +12,8 @@ leanplum-data-export export-leanplum \
--client-key $LEANPLUM_CLIENT_KEY \ --client-key $LEANPLUM_CLIENT_KEY \
--date 20190101 \ --date 20190101 \
--bucket gcs-leanplum-export \ --bucket gcs-leanplum-export \
--table-prefix leanplum \
--bq-dataset dev_external \
--prefix dev --prefix dev
``` ```
@ -31,8 +33,9 @@ make run COMMAND="leanplum-data-export export-leanplum \
--client-key $LEANPLUM_CLIENT_KEY \ --client-key $LEANPLUM_CLIENT_KEY \
--date 20190101 \ --date 20190101 \
--bucket gcs-leanplum-export \ --bucket gcs-leanplum-export \
--prefix dev \ --table-prefix leanplum \
--dataset leanplum" --bq-dataset leanplum \
--prefix dev"
``` ```
That will create the dataset in BQ, download the files, and make That will create the dataset in BQ, download the files, and make

Просмотреть файл

@ -18,9 +18,11 @@ from .export import LeanplumExporter
@click.option("--bucket", required=True) @click.option("--bucket", required=True)
@click.option("--prefix", default="") @click.option("--prefix", default="")
@click.option("--bq-dataset", required=True) @click.option("--bq-dataset", required=True)
def export_leanplum(app_id, client_key, date, bucket, prefix, bq_dataset): @click.option("--table-prefix", default=None)
@click.option("--version", default=1)
def export_leanplum(app_id, client_key, date, bucket, prefix, bq_dataset, table_prefix, version):
exporter = LeanplumExporter(app_id, client_key) exporter = LeanplumExporter(app_id, client_key)
exporter.export(date, bucket, prefix, bq_dataset) exporter.export(date, bucket, prefix, bq_dataset, table_prefix, version)
@click.group() @click.group()

Просмотреть файл

@ -13,18 +13,20 @@ class LeanplumExporter(object):
FINISHED_STATE = "FINISHED" FINISHED_STATE = "FINISHED"
DEFAULT_SLEEP_SECONDS = 10 DEFAULT_SLEEP_SECONDS = 10
DEFAULT_EXPORT_FORMAT = "csv" DEFAULT_EXPORT_FORMAT = "csv"
FILENAME_RE = "^https://leanplum_export.storage.googleapis.com/export-.*-([a-z0-9]+)-([0-9]+)$" FILENAME_RE = (r"^https://leanplum_export.storage.googleapis.com"
"/export-.*-output([a-z0-9]+)-([0-9]+)$")
def __init__(self, app_id, client_key): def __init__(self, app_id, client_key):
self.app_id = app_id self.app_id = app_id
self.client_key = client_key self.client_key = client_key
self.filename_re = re.compile(LeanplumExporter.FILENAME_RE) self.filename_re = re.compile(LeanplumExporter.FILENAME_RE)
def export(self, date, bucket, prefix, dataset, export_format=DEFAULT_EXPORT_FORMAT): def export(self, date, bucket, prefix, dataset, table_prefix,
version, export_format=DEFAULT_EXPORT_FORMAT):
job_id = self.init_export(date, export_format) job_id = self.init_export(date, export_format)
file_uris = self.get_files(job_id) file_uris = self.get_files(job_id)
tables = self.save_files(file_uris, bucket, prefix, date, export_format) tables = self.save_files(file_uris, bucket, prefix, date, export_format, version)
self.create_external_tables(bucket, prefix, date, tables, dataset) self.create_external_tables(bucket, prefix, date, tables, dataset, table_prefix, version)
def init_export(self, date, export_format): def init_export(self, date, export_format):
export_init_url = (f"http://www.leanplum.com/api" export_init_url = (f"http://www.leanplum.com/api"
@ -67,16 +69,18 @@ class LeanplumExporter(object):
return response.json()['response'][0]['files'] return response.json()['response'][0]['files']
def save_files(self, file_uris, bucket_name, prefix, date, export_format): def save_files(self, file_uris, bucket_name, prefix, date, export_format, version):
client = storage.Client() client = storage.Client()
bucket = client.get_bucket(bucket_name) bucket = client.get_bucket(bucket_name)
datatypes = set() datatypes = set()
version_str = f"v{version}"
if prefix: if prefix:
prefix = self.add_slash_if_not_present(prefix) + date prefix = self.add_slash_if_not_present(prefix) + version_str
else: else:
prefix = date prefix = version_str
prefix += f"/{date}"
self.delete_gcs_prefix(client, bucket, prefix) self.delete_gcs_prefix(client, bucket, prefix)
for uri in file_uris: for uri in file_uris:
@ -121,22 +125,30 @@ class LeanplumExporter(object):
bucket.delete_blobs(blobs) bucket.delete_blobs(blobs)
def create_external_tables(self, bucket_name, prefix, date, tables, dataset): def create_external_tables(self, bucket_name, prefix, date, tables,
gcs_loc = f"gs://{bucket_name}/{prefix}/{date}" dataset, table_prefix, version):
if table_prefix:
table_prefix += "_"
else:
table_prefix = ""
gcs_loc = f"gs://{bucket_name}/{prefix}/v{version}/{date}"
client = bigquery.Client() client = bigquery.Client()
dataset_ref = client.dataset(dataset) dataset_ref = client.dataset(dataset)
for t in tables: for leanplum_name in tables:
table_name = f"{t}_{date}" table_name = f"{table_prefix}{leanplum_name}_v{version}_{date}"
logging.info(f"Creating table {table_name}")
table_ref = bigquery.TableReference(dataset_ref, table_name) table_ref = bigquery.TableReference(dataset_ref, table_name)
table = bigquery.Table(table_ref) table = bigquery.Table(table_ref)
client.delete_table(table, not_found_ok=True) client.delete_table(table, not_found_ok=True)
external_config = bigquery.ExternalConfig('CSV') external_config = bigquery.ExternalConfig('CSV')
external_config.source_uris = [f"{gcs_loc}/{t}/*"] external_config.source_uris = [f"{gcs_loc}/{leanplum_name}/*"]
external_config.autodetect = True external_config.autodetect = True
table.external_data_configuration = external_config table.external_data_configuration = external_config

Просмотреть файл

@ -99,15 +99,15 @@ class TestExporter(object):
mock_bucket.blob.return_value = mock_blob mock_bucket.blob.return_value = mock_blob
mock_blob.upload_from_filename.side_effect = set_contents mock_blob.upload_from_filename.side_effect = set_contents
exporter.save_files(file_uris, bucket, prefix, date, "json") exporter.save_files(file_uris, bucket, prefix, date, "json", 1)
suffix = f"outputsessions/0.json" suffix = f"sessions/0.json"
mock_client.get_bucket.assert_called_with(bucket) mock_client.get_bucket.assert_called_with(bucket)
mock_bucket.blob.assert_called_with(f"{prefix}/{date}/{suffix}") mock_bucket.blob.assert_called_with(f"{prefix}/v1/{date}/{suffix}")
mock_blob.upload_from_filename.assert_called_with(suffix) mock_blob.upload_from_filename.assert_called_with(suffix)
assert self.file_contents == file_body assert self.file_contents == file_body
assert not os.path.isfile(suffix) assert not os.path.isfile(suffix)
assert not os.path.isdir("outputsessions") assert not os.path.isdir("sessions")
@responses.activate @responses.activate
def test_save_files_no_prefix(self, exporter): def test_save_files_no_prefix(self, exporter):
@ -139,23 +139,23 @@ class TestExporter(object):
mock_bucket.blob.return_value = mock_blob mock_bucket.blob.return_value = mock_blob
mock_blob.upload_from_filename.side_effect = set_contents mock_blob.upload_from_filename.side_effect = set_contents
exporter.save_files(file_uris, bucket, prefix, date, "json") exporter.save_files(file_uris, bucket, prefix, date, "json", 1)
suffix = f"outputsessions/0.json" suffix = f"sessions/0.json"
mock_client.get_bucket.assert_called_with(bucket) mock_client.get_bucket.assert_called_with(bucket)
mock_bucket.blob.assert_called_with(f"{date}/{suffix}") mock_bucket.blob.assert_called_with(f"v1/{date}/{suffix}")
mock_blob.upload_from_filename.assert_called_with(suffix) mock_blob.upload_from_filename.assert_called_with(suffix)
assert self.file_contents == file_body assert self.file_contents == file_body
assert not os.path.isfile(suffix) assert not os.path.isfile(suffix)
assert not os.path.isdir("outputsessions") assert not os.path.isdir("sessions")
@responses.activate @responses.activate
def test_save_files_multiple_uris(self, exporter): def test_save_files_multiple_uris(self, exporter):
n_files = 5 n_files = 5
base_uri = ("https://leanplum_export.storage.googleapis.com/export" base_uri = ("https://leanplum_export.storage.googleapis.com/export"
"-5094741967896576-60c43e66-30fe-4e21-9bbd-563d2749b96f") "-5094741967896576-60c43e66-30fe-4e21-9bbd-563d2749b96f-output")
file_types = ["outputsessions", "outputexperiments"] file_types = ["sessions", "experiments"]
file_uris = [f'{base_uri}-{ftype}-{i}' for ftype in file_types for i in range(n_files)] file_uris = [f'{base_uri}{ftype}-{i}' for ftype in file_types for i in range(n_files)]
bucket = 'abucket' bucket = 'abucket'
prefix = '' prefix = ''
file_body = b"data" file_body = b"data"
@ -182,14 +182,14 @@ class TestExporter(object):
mock_bucket.blob.return_value = mock_blob mock_bucket.blob.return_value = mock_blob
mock_blob.upload_from_filename.side_effect = set_contents mock_blob.upload_from_filename.side_effect = set_contents
tables = exporter.save_files(file_uris, bucket, prefix, date, "json") tables = exporter.save_files(file_uris, bucket, prefix, date, "json", 1)
mock_client.get_bucket.assert_called_with(bucket) mock_client.get_bucket.assert_called_with(bucket)
assert tables == set(file_types) assert tables == set(file_types)
for ftype in file_types: for ftype in file_types:
for i in range(n_files): for i in range(n_files):
suffix = f"{ftype}/{i}.json" suffix = f"{ftype}/{i}.json"
mock_bucket.blob.assert_any_call(f"{date}/{suffix}") mock_bucket.blob.assert_any_call(f"v1/{date}/{suffix}")
mock_blob.upload_from_filename.assert_any_call(suffix) mock_blob.upload_from_filename.assert_any_call(suffix)
assert self.file_contents == file_body assert self.file_contents == file_body
assert not os.path.isfile(suffix) assert not os.path.isfile(suffix)
@ -212,7 +212,7 @@ class TestExporter(object):
with patch('leanplum_data_export.export.storage', spec=True) as MockStorage: # noqa F841 with patch('leanplum_data_export.export.storage', spec=True) as MockStorage: # noqa F841
with pytest.raises(Exception): with pytest.raises(Exception):
exporter.save_files(file_uris, bucket, prefix, date, "json") exporter.save_files(file_uris, bucket, prefix, date, "json", 1)
@responses.activate @responses.activate
def test_export(self, exporter): def test_export(self, exporter):
@ -266,23 +266,23 @@ class TestExporter(object):
MockBq.Table.return_value = mock_table MockBq.Table.return_value = mock_table
MockBq.ExternalConfig.return_value = mock_config MockBq.ExternalConfig.return_value = mock_config
exporter.export(date, bucket, prefix, dataset_name) exporter.export(date, bucket, prefix, dataset_name, "", 1)
suffix = f"outputsessions/0.csv" suffix = f"sessions/0.csv"
mock_client.get_bucket.assert_called_with(bucket) mock_client.get_bucket.assert_called_with(bucket)
mock_bucket.blob.assert_called_with(f"{prefix}/{date}/{suffix}") mock_bucket.blob.assert_called_with(f"{prefix}/v1/{date}/{suffix}")
mock_blob.upload_from_filename.assert_called_with(suffix) mock_blob.upload_from_filename.assert_called_with(suffix)
assert self.file_contents == file_body assert self.file_contents == file_body
assert not os.path.isfile(suffix) assert not os.path.isfile(suffix)
assert not os.path.isdir("outputsessions") assert not os.path.isdir("sessions")
mock_bq_client.dataset.assert_any_call(dataset_name) mock_bq_client.dataset.assert_any_call(dataset_name)
mock_bq_client.delete_table.assert_called_with(mock_table, not_found_ok=True) mock_bq_client.delete_table.assert_called_with(mock_table, not_found_ok=True)
MockBq.TableReference.assert_any_call(mock_dataset_ref, f"outputsessions_{date}") MockBq.TableReference.assert_any_call(mock_dataset_ref, f"sessions_v1_{date}")
MockBq.Table.assert_any_call(mock_table_ref) MockBq.Table.assert_any_call(mock_table_ref)
MockBq.ExternalConfig.assert_any_call("CSV") MockBq.ExternalConfig.assert_any_call("CSV")
expected_source_uris = [f"gs://{bucket}/{prefix}/{date}/outputsessions/*"] expected_source_uris = [f"gs://{bucket}/{prefix}/v1/{date}/sessions/*"]
assert mock_config.source_uris == expected_source_uris assert mock_config.source_uris == expected_source_uris
assert mock_config.autodetect is True assert mock_config.autodetect is True
assert mock_table.external_data_configuration == mock_config assert mock_table.external_data_configuration == mock_config
@ -307,3 +307,36 @@ class TestExporter(object):
with pytest.raises(Exception): with pytest.raises(Exception):
exporter.delete_gcs_prefix(client, bucket, prefix) exporter.delete_gcs_prefix(client, bucket, prefix)
def test_created_external_tables(self, exporter):
date = "20190101"
bucket = 'abucket'
prefix = 'aprefix'
dataset_name = "leanplum_dataset"
tables = ["sessions"]
table_prefix = "prefix"
with patch('leanplum_data_export.export.bigquery', spec=True) as MockBq:
mock_bq_client, mock_dataset_ref = Mock(), Mock()
mock_table_ref, mock_table, mock_config = Mock(), Mock(), Mock()
mock_bq_client.dataset.return_value = mock_dataset_ref
MockBq.Client.return_value = mock_bq_client
MockBq.TableReference.return_value = mock_table_ref
MockBq.Table.return_value = mock_table
MockBq.ExternalConfig.return_value = mock_config
exporter.create_external_tables(
bucket, prefix, date, tables, dataset_name, table_prefix, 1)
mock_bq_client.dataset.assert_any_call(dataset_name)
mock_bq_client.delete_table.assert_called_with(mock_table, not_found_ok=True)
MockBq.TableReference.assert_any_call(mock_dataset_ref,
f"{table_prefix}_sessions_v1_{date}")
MockBq.Table.assert_any_call(mock_table_ref)
MockBq.ExternalConfig.assert_any_call("CSV")
expected_source_uris = [f"gs://{bucket}/{prefix}/v1/{date}/sessions/*"]
assert mock_config.source_uris == expected_source_uris
assert mock_config.autodetect is True
assert mock_table.external_data_configuration == mock_config
mock_bq_client.create_table.assert_any_call(mock_table)