Create export script
This commit is contained in:
Родитель
d870f332b9
Коммит
a7f21539d2
|
@ -0,0 +1,195 @@
|
|||
import argparse
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
from google.api_core.exceptions import GoogleAPICallError
|
||||
from google.cloud import bigquery_datatransfer
|
||||
from google.cloud.bigquery_datatransfer import enums as transfer_enums
|
||||
|
||||
# limit based on docs
|
||||
# https://cloud.google.com/bigquery-transfer/docs/working-with-transfers#backfilling
|
||||
BACKFILL_DAYS_MAX = 180
|
||||
|
||||
|
||||
class DataTransferException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def to_utc_midnight_datetime(base_date: datetime.date) -> datetime.datetime:
|
||||
return datetime.datetime(
|
||||
year=base_date.year, month=base_date.month, day=base_date.day,
|
||||
tzinfo=datetime.timezone.utc,
|
||||
)
|
||||
|
||||
|
||||
def trigger_backfill(
|
||||
start_date: datetime.date, end_date: datetime.date,
|
||||
transfer_config_name: str,
|
||||
client: bigquery_datatransfer.DataTransferServiceClient
|
||||
) -> List[bigquery_datatransfer.types.TransferRun]:
|
||||
"""
|
||||
Start backfill of the given date range and return list of tuples of name and scheduled time
|
||||
for transfer runs created (1 per day)
|
||||
"""
|
||||
logging.info(f"Starting backfill {start_date} to {end_date}")
|
||||
|
||||
date_diff = (end_date - start_date).days
|
||||
if date_diff < 0:
|
||||
raise ValueError("End date must be equal to or greater than start date")
|
||||
if (end_date - start_date).days >= BACKFILL_DAYS_MAX:
|
||||
raise ValueError(f"A maximum of {BACKFILL_DAYS_MAX} days is allowed per backfill request")
|
||||
|
||||
# end_date is midnight of the day after the last day
|
||||
end_date = to_utc_midnight_datetime(end_date) + datetime.timedelta(days=1)
|
||||
start_date = to_utc_midnight_datetime(start_date)
|
||||
|
||||
response = client.start_manual_transfer_runs(
|
||||
parent=transfer_config_name,
|
||||
requested_time_range={
|
||||
"start_time": bigquery_datatransfer.types.Timestamp(
|
||||
seconds=int(start_date.timestamp())),
|
||||
"end_time": bigquery_datatransfer.types.Timestamp(
|
||||
seconds=int(end_date.timestamp())),
|
||||
},
|
||||
)
|
||||
return response.runs
|
||||
|
||||
|
||||
def wait_for_transfer(transfer_name: str, timeout: int = 1200, polling_period: int = 20) -> int:
|
||||
"""
|
||||
Continuously poll for run status to wait for completion
|
||||
"""
|
||||
client = bigquery_datatransfer.DataTransferServiceClient()
|
||||
|
||||
state = transfer_enums.TransferState.PENDING
|
||||
|
||||
time_elapsed = 0
|
||||
while (state == transfer_enums.TransferState.PENDING or
|
||||
state == transfer_enums.TransferState.RUNNING):
|
||||
try:
|
||||
transfer_run = client.get_transfer_run(transfer_name)
|
||||
except GoogleAPICallError as e:
|
||||
# grpc errors are not serializable and cannot be raised in multiprocessing
|
||||
raise DataTransferException(f"Error getting transfer run: {e.message}")
|
||||
|
||||
run_date = datetime.datetime.utcfromtimestamp(transfer_run.run_time.seconds).date()
|
||||
|
||||
state = transfer_run.state
|
||||
|
||||
if not (state == transfer_enums.TransferState.PENDING or
|
||||
state == transfer_enums.TransferState.RUNNING):
|
||||
break
|
||||
|
||||
if time_elapsed >= timeout:
|
||||
logging.info(f"Transfer for {run_date} did not complete in {timeout} seconds")
|
||||
return -1
|
||||
time.sleep(polling_period)
|
||||
time_elapsed += polling_period
|
||||
|
||||
if state == transfer_enums.TransferState.SUCCEEDED:
|
||||
result = "succeeded"
|
||||
elif state == transfer_enums.TransferState.CANCELLED:
|
||||
result = "cancelled"
|
||||
elif state == transfer_enums.TransferState.FAILED:
|
||||
result = "failed"
|
||||
else:
|
||||
result = "unspecified"
|
||||
|
||||
logging.info(f"Transfer for {run_date} {result}")
|
||||
|
||||
return state
|
||||
|
||||
|
||||
def start_export(project: str, transfer_config_name: str, transfer_location: str,
|
||||
base_date: datetime.date, backfill_day_count: int = 35):
|
||||
"""
|
||||
Start and wait for the completion of a backfill of `backfill_day_count` days, counting
|
||||
backwards from `base_date. The base date is included in the backfill and counts as a
|
||||
day in the day count, i.e. `backfill_day_count` will backfill only .
|
||||
"""
|
||||
if backfill_day_count <= 0:
|
||||
raise ValueError("Number of days to backfill must be at least 1")
|
||||
|
||||
client = bigquery_datatransfer.DataTransferServiceClient()
|
||||
play_store_transfer_config = client.location_transfer_config_path(
|
||||
project, transfer_location, transfer_config_name
|
||||
)
|
||||
|
||||
oldest_date = base_date - datetime.timedelta(days=backfill_day_count - 1)
|
||||
end_date = base_date
|
||||
|
||||
logging.info(f"Backfilling {backfill_day_count} days: {oldest_date} to {base_date}")
|
||||
|
||||
transfer_results = []
|
||||
# break backfills into BACKFILL_DAYS_MAX day segments
|
||||
while True:
|
||||
start_date = max(end_date - datetime.timedelta(days=BACKFILL_DAYS_MAX - 1), oldest_date)
|
||||
transfer_runs = trigger_backfill(start_date, end_date,
|
||||
play_store_transfer_config, client)
|
||||
transfer_run_names = [transfer_run.name for transfer_run
|
||||
in sorted(transfer_runs, key=lambda run: run.schedule_time.seconds)]
|
||||
end_date = start_date - datetime.timedelta(days=1)
|
||||
|
||||
# wait for backfill to complete
|
||||
# days in backfill are scheduled by the transfer service sequentially with 30s in between
|
||||
# starting from the latest date but can run in parallel
|
||||
new_results = map(wait_for_transfer, transfer_run_names)
|
||||
transfer_results.extend(new_results)
|
||||
|
||||
if start_date == oldest_date:
|
||||
break
|
||||
elif start_date < oldest_date:
|
||||
raise ValueError("start_date should not be greater than oldest_date")
|
||||
|
||||
successes = len([
|
||||
result for result in transfer_results
|
||||
if result == transfer_enums.TransferState.SUCCEEDED
|
||||
])
|
||||
|
||||
if len(transfer_results) != successes:
|
||||
raise DataTransferException(f"{len(transfer_results) - successes} failed dates")
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--date",
|
||||
type=datetime.date.fromisoformat,
|
||||
required=True,
|
||||
help="Date at which the backfill will start, going backwards"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--project",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Either the project that the source GCS project belongs to or "
|
||||
"the project that contains the transfer config"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--transfer-config",
|
||||
type=str,
|
||||
required=True,
|
||||
help="ID of the transfer config. This should be a UUID."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--transfer-location",
|
||||
type=str,
|
||||
default='us',
|
||||
help="Region of the transfer config (defaults to `us`)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--backfill-day-count",
|
||||
type=int,
|
||||
default=35,
|
||||
help="Number of days to backfill"
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
args = parse_args()
|
||||
start_export(args.project, args.transfer_config, args.transfer_location,
|
||||
args.date, args.backfill_day_count)
|
|
@ -1,7 +1,233 @@
|
|||
import pytest
|
||||
import itertools
|
||||
from datetime import date
|
||||
from unittest import TestCase
|
||||
from unittest.mock import ANY, call, MagicMock, patch
|
||||
|
||||
from google.cloud.bigquery_datatransfer import enums as transfer_enums
|
||||
|
||||
from play_store_export import export
|
||||
|
||||
|
||||
class TestExport(object):
|
||||
class TestExport(TestCase):
|
||||
|
||||
def test_placeholder(self):
|
||||
assert 1 == 1
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.default_backfill_max_days = export.BACKFILL_DAYS_MAX
|
||||
|
||||
def tearDown(self):
|
||||
export.BACKFILL_DAYS_MAX = self.default_backfill_max_days
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
def test_trigger_backfill_valid_date_range(self, mock_transfer_client):
|
||||
export.trigger_backfill(
|
||||
start_date=date.fromisoformat("2020-04-04"),
|
||||
end_date=date.fromisoformat("2020-04-04"),
|
||||
transfer_config_name="",
|
||||
client=mock_transfer_client,
|
||||
)
|
||||
|
||||
export.trigger_backfill(
|
||||
start_date=date.fromisoformat("2020-04-04"),
|
||||
end_date=date.fromisoformat("2020-04-05"),
|
||||
transfer_config_name="",
|
||||
client=mock_transfer_client,
|
||||
)
|
||||
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
export.trigger_backfill,
|
||||
start_date=date.fromisoformat("2020-04-04"),
|
||||
end_date=date.fromisoformat("2020-04-03"),
|
||||
transfer_config_name="",
|
||||
client=mock_transfer_client,
|
||||
)
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
def test_trigger_backfill_max_days(self, mock_transfer_client):
|
||||
export.trigger_backfill(
|
||||
start_date=date.fromisoformat("2020-04-04"),
|
||||
end_date=date.fromisoformat("2020-05-04"),
|
||||
transfer_config_name="",
|
||||
client=mock_transfer_client,
|
||||
)
|
||||
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
export.trigger_backfill,
|
||||
start_date=date.fromisoformat("2020-04-04"),
|
||||
end_date=date.fromisoformat("2021-04-04"),
|
||||
transfer_config_name="",
|
||||
client=mock_transfer_client,
|
||||
)
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
@patch("google.cloud.bigquery_datatransfer.types")
|
||||
def test_trigger_backfill_transfer_run_args(self, mock_types, mock_transfer_client):
|
||||
mock_types.Timestamp = dict
|
||||
|
||||
config_name = "config_name_1"
|
||||
start_date = date.fromisoformat("2020-04-04")
|
||||
|
||||
export.trigger_backfill(
|
||||
start_date=start_date,
|
||||
end_date=date.fromisoformat("2020-05-04"),
|
||||
transfer_config_name="config_name_1",
|
||||
client=mock_transfer_client,
|
||||
)
|
||||
|
||||
mock_transfer_client.start_manual_transfer_runs.assert_called_with(
|
||||
parent=config_name,
|
||||
requested_time_range={
|
||||
"start_time": {
|
||||
"seconds": int(export.to_utc_midnight_datetime(start_date).timestamp())
|
||||
},
|
||||
"end_time": {
|
||||
"seconds": int(export.to_utc_midnight_datetime(
|
||||
date.fromisoformat("2020-05-05")).timestamp())
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
@patch("time.sleep")
|
||||
def test_wait_for_transfer_success(self, mock_sleep, mock_transfer_client):
|
||||
mock_transfer_client.return_value = mock_transfer_client
|
||||
|
||||
mock_transfer_client.get_transfer_run.side_effect = [
|
||||
MagicMock(state=transfer_enums.TransferState.PENDING),
|
||||
MagicMock(state=transfer_enums.TransferState.RUNNING),
|
||||
MagicMock(state=transfer_enums.TransferState.RUNNING),
|
||||
MagicMock(state=transfer_enums.TransferState.SUCCEEDED),
|
||||
MagicMock(state=transfer_enums.TransferState.SUCCEEDED),
|
||||
]
|
||||
|
||||
result = export.wait_for_transfer("transfer", timeout=10, polling_period=1)
|
||||
|
||||
self.assertEqual(result, transfer_enums.TransferState.SUCCEEDED)
|
||||
self.assertEqual(mock_transfer_client.get_transfer_run.call_count, 4)
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
@patch("time.sleep")
|
||||
def test_wait_for_transfer_timeout_exact(self, mock_sleep, mock_transfer_client):
|
||||
mock_transfer_client.return_value = mock_transfer_client
|
||||
|
||||
mock_transfer_client.get_transfer_run.side_effect = itertools.repeat(
|
||||
MagicMock(state=transfer_enums.TransferState.RUNNING)
|
||||
)
|
||||
|
||||
result = export.wait_for_transfer("transfer", timeout=10, polling_period=1)
|
||||
|
||||
self.assertEqual(result, -1)
|
||||
self.assertEqual(mock_transfer_client.get_transfer_run.call_count, 11)
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
@patch("time.sleep")
|
||||
def test_wait_for_transfer_timeout_inexact(self, mock_sleep, mock_transfer_client):
|
||||
mock_transfer_client.return_value = mock_transfer_client
|
||||
|
||||
mock_transfer_client.get_transfer_run.side_effect = itertools.repeat(
|
||||
MagicMock(state=transfer_enums.TransferState.RUNNING)
|
||||
)
|
||||
|
||||
result = export.wait_for_transfer("transfer", timeout=10, polling_period=3)
|
||||
|
||||
self.assertEqual(result, -1)
|
||||
self.assertEqual(mock_transfer_client.get_transfer_run.call_count, 5)
|
||||
|
||||
@classmethod
|
||||
def mock_transfer_run(cls, name, schedule_time):
|
||||
mock_transfer = MagicMock()
|
||||
mock_transfer.name = name
|
||||
mock_transfer.schedule_time.seconds = schedule_time
|
||||
return mock_transfer
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
@patch("play_store_export.export.trigger_backfill")
|
||||
@patch("play_store_export.export.wait_for_transfer")
|
||||
def test_export_max_days_under(self, mock_wait_for_transfer,
|
||||
mock_trigger_backfill, mock_transfer_client):
|
||||
export.BACKFILL_DAYS_MAX = 5
|
||||
|
||||
mock_trigger_backfill.return_value = [
|
||||
self.mock_transfer_run("c", 30),
|
||||
self.mock_transfer_run("d", 40),
|
||||
self.mock_transfer_run("b", 20),
|
||||
self.mock_transfer_run("a", 10),
|
||||
]
|
||||
mock_wait_for_transfer.return_value = transfer_enums.TransferState.SUCCEEDED
|
||||
|
||||
export.start_export("project", "config", "us",
|
||||
base_date=date(2020, 5, 5), backfill_day_count=4)
|
||||
|
||||
mock_trigger_backfill.assert_called_once_with(date(2020, 5, 2), date(2020, 5, 5), ANY, ANY)
|
||||
mock_wait_for_transfer.assert_has_calls([call("a"), call("b"), call("c"), call("d")])
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
@patch("play_store_export.export.trigger_backfill")
|
||||
@patch("play_store_export.export.wait_for_transfer")
|
||||
def test_export_max_days_equal(self, mock_wait_for_transfer,
|
||||
mock_trigger_backfill, mock_transfer_client):
|
||||
export.BACKFILL_DAYS_MAX = 3
|
||||
|
||||
mock_trigger_backfill.return_value = [
|
||||
self.mock_transfer_run("d", 40),
|
||||
self.mock_transfer_run("b", 20),
|
||||
self.mock_transfer_run("a", 10),
|
||||
]
|
||||
mock_wait_for_transfer.return_value = transfer_enums.TransferState.SUCCEEDED
|
||||
|
||||
export.start_export("project", "config", "us",
|
||||
base_date=date(2020, 5, 5), backfill_day_count=3)
|
||||
|
||||
mock_trigger_backfill.assert_called_once_with(date(2020, 5, 3), date(2020, 5, 5), ANY, ANY)
|
||||
mock_wait_for_transfer.assert_has_calls([call("a"), call("b"), call("d")])
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
@patch("play_store_export.export.trigger_backfill")
|
||||
@patch("play_store_export.export.wait_for_transfer")
|
||||
def test_export_max_days_over(self, mock_wait_for_transfer,
|
||||
mock_trigger_backfill, mock_transfer_client):
|
||||
export.BACKFILL_DAYS_MAX = 2
|
||||
|
||||
mock_trigger_backfill.side_effect = [
|
||||
[
|
||||
self.mock_transfer_run("a", 10),
|
||||
self.mock_transfer_run("b", 20),
|
||||
],
|
||||
[
|
||||
self.mock_transfer_run("d", 40),
|
||||
self.mock_transfer_run("c", 30),
|
||||
],
|
||||
[
|
||||
self.mock_transfer_run("e", 50),
|
||||
],
|
||||
]
|
||||
mock_wait_for_transfer.return_value = transfer_enums.TransferState.SUCCEEDED
|
||||
|
||||
export.start_export("project", "config", "us",
|
||||
base_date=date(2020, 5, 5), backfill_day_count=5)
|
||||
|
||||
mock_trigger_backfill.assert_has_calls([
|
||||
call(date(2020, 5, 4), date(2020, 5, 5), ANY, ANY),
|
||||
call(date(2020, 5, 2), date(2020, 5, 3), ANY, ANY),
|
||||
call(date(2020, 5, 1), date(2020, 5, 1), ANY, ANY),
|
||||
])
|
||||
mock_wait_for_transfer.assert_has_calls(
|
||||
[call("a"), call("b"), call("c"), call("d"), call("e")]
|
||||
)
|
||||
|
||||
@patch("google.cloud.bigquery_datatransfer.DataTransferServiceClient")
|
||||
@patch("play_store_export.export.trigger_backfill")
|
||||
@patch("play_store_export.export.wait_for_transfer")
|
||||
def test_export_failed_transfer(self, mock_wait_for_transfer,
|
||||
mock_trigger_backfill, mock_transfer_client):
|
||||
mock_trigger_backfill.return_value = [
|
||||
self.mock_transfer_run("a", 10),
|
||||
]
|
||||
mock_wait_for_transfer.return_value = transfer_enums.TransferState.FAILED
|
||||
|
||||
self.assertRaises(export.DataTransferException, export.start_export,
|
||||
"project", "config", "us",
|
||||
base_date=date(2020, 5, 5), backfill_day_count=1)
|
||||
mock_trigger_backfill.assert_called_once_with(date(2020, 5, 5), date(2020, 5, 5), ANY, ANY)
|
||||
mock_wait_for_transfer.assert_has_calls([call("a")])
|
||||
|
|
Загрузка…
Ссылка в новой задаче