Change fxci metric export to process all data from yesterday, rather than from 10 minutes ago (#253)

This commit is contained in:
Andrew Halberstadt 2024-08-07 11:29:55 -04:00 коммит произвёл GitHub
Родитель 4c4891b07a
Коммит 2068dd0128
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
10 изменённых файлов: 102 добавлений и 45 удалений

Просмотреть файл

@ -73,7 +73,7 @@ class Config:
# Depending on the commands being run, the pulse or monitoring
# configs may not be necessary.
pulse: Optional[PulseConfig]
monitoring: Optional[MonitoringConfig]
monitoring: MonitoringConfig = MonitoringConfig()
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Config":

Просмотреть файл

@ -6,6 +6,7 @@ from cleo.commands.command import Command
from cleo.helpers import option
from fxci_etl.config import Config
from fxci_etl.loaders.bigquery import BigQueryLoader
from fxci_etl.metric.export import export_metrics
from fxci_etl.pulse.consume import drain
@ -14,7 +15,7 @@ APP_NAME = "fxci-etl"
class ConfigCommand(Command):
options = [
option("--config", description="Path to config file to use.", default=None)
option("--config", description="Path to config file to use.", flag=False, default=None)
]
def parse_config(self, config_path: str | Path | None) -> Config:
@ -30,9 +31,10 @@ class PulseDrainCommand(ConfigCommand):
def handle(self):
config = self.parse_config(self.option("config"))
callbacks = [BigQueryLoader(config)]
for queue in config.pulse.queues:
self.line(f"Draining queue {queue}")
drain(config, queue)
drain(config, queue, callbacks)
return 0

Просмотреть файл

@ -103,14 +103,14 @@ class BigQueryLoader:
self.bucket = self.storage_client.bucket(config.storage.bucket)
self._record_backup = self.bucket.blob("failed-bq-records.json")
def ensure_table(self, name: str, cls: Type[Record]):
def ensure_table(self, name: str, cls_: Type[Record]):
"""Checks if the table exists in BQ and creates it otherwise.
Fails if the table exists but has the wrong schema.
"""
print(f"Ensuring table {name} exists.")
bq = self.config.bigquery
schema = generate_schema(cls)
schema = generate_schema(cls_)
partition = TimePartitioning(
type_=TimePartitioningType.DAY,
@ -121,8 +121,9 @@ class BigQueryLoader:
table.time_partitioning = partition
self.client.create_table(table, exists_ok=True)
def get_table(self, name: str) -> Table:
def get_table(self, name: str, cls_: Type[Record]) -> Table:
if name not in self._tables:
self.ensure_table(name, cls_)
bq = self.config.bigquery
self._tables[name] = self.client.get_table(
f"{bq.project}.{bq.dataset}.{name}"
@ -144,13 +145,13 @@ class BigQueryLoader:
tables = {}
for record in records:
if record.table not in tables:
self.ensure_table(record.table, record.__class__)
tables[record.table] = []
tables[record.table].append(record)
failed_records = []
for name, rows in tables.items():
table = self.get_table(name)
print(f"Attempting to insert {len(rows)} records into table '{name}'")
table = self.get_table(name, rows[0].__class__)
errors = self.client.insert_rows(table, [asdict(row) for row in rows])
for error in errors:

Просмотреть файл

@ -4,6 +4,7 @@ from dataclasses import dataclass
from datetime import datetime, timedelta
from pprint import pprint
import pytz
from google.cloud import storage
from google.cloud.exceptions import NotFound
from google.cloud.monitoring_v3 import (
@ -20,7 +21,7 @@ from fxci_etl.loaders.bigquery import BigQueryLoader, BigQueryTypes as t, Record
METRIC = "compute.googleapis.com/instance/uptime"
DEFAULT_INTERVAL = 3600 * 6
MINIMUM_INTERVAL = 10
MIN_BUFFER_TIME = 10 # minutes
@dataclass
@ -87,21 +88,32 @@ class MetricExporter:
return results
def get_time_interval(self) -> TimeInterval:
"""Return the time interval to query metrics over.
This will grab metrics all metrics from the last end time, up until
11:59:59 of yesterday. Ideally the metric export runs in a daily cron
task, such that it exports a days worth of data at a time.
"""
utc = pytz.UTC
now = datetime.now(utc)
yesterday = now.date() - timedelta(days=1)
end_time = utc.localize(datetime.combine(yesterday, datetime.max.time()))
# Ensure end_time is at least 10 minutes in the past to ensure Cloud
# Monitoring has finished adding metrics for the prior day.
if now <= end_time + timedelta(minutes=MIN_BUFFER_TIME):
raise Exception(f"Abort: metric export ran too close to {end_time}! "
f"It must run at least {MIN_BUFFER_TIME} minutes after this time.")
# Set end time to ten minutes in the past to ensure Google Cloud Monitoring
# has finished computing all of its metrics.
end_time = datetime.now() - timedelta(minutes=10)
try:
start_time = json.loads(self.last_export.download_as_string())["end_time"]
except NotFound:
start_time = int(
(end_time - timedelta(seconds=MINIMUM_INTERVAL)).timestamp()
)
start_time = int(utc.localize(datetime.combine(yesterday, datetime.min.time())).timestamp())
end_time = int(end_time.timestamp())
if start_time + MINIMUM_INTERVAL > end_time:
raise Exception("Abort: metric export ran too recently!")
if start_time >= end_time:
raise Exception(f"Abort: metric export already ran for {yesterday}!")
return TimeInterval(
end_time=Timestamp(seconds=end_time),
@ -134,8 +146,8 @@ def export_metrics(config: Config, dry_run: bool = False) -> int:
"uptime": round(ts.points[0].value.double_value, 2),
"interval_start_time": ts.points[
0
].interval.start_time.timestamp(),
"interval_end_time": ts.points[0].interval.end_time.timestamp(),
].interval.start_time.timestamp(), # type: ignore
"interval_end_time": ts.points[0].interval.end_time.timestamp(), # type: ignore
}
)
)
@ -146,8 +158,8 @@ def export_metrics(config: Config, dry_run: bool = False) -> int:
if not records:
raise Exception("Abort: No records retrieved!")
exporter.set_last_end_time(int(interval.end_time.timestamp()))
loader = BigQueryLoader(config)
loader.insert(records)
exporter.set_last_end_time(int(interval.end_time.timestamp())) # type: ignore
return 0

Просмотреть файл

@ -40,9 +40,7 @@ def get_consumer(
return consumer
def drain(config: Config, name: str):
callbacks = [BigQueryHandler(config)]
def drain(config: Config, name: str, callbacks: list[PulseHandler]):
with get_connection(config) as connection:
with get_consumer(config, connection, name, callbacks) as consumer:
while True:

Просмотреть файл

@ -5,3 +5,4 @@ google-cloud-bigquery ~= 3.22.0
google-cloud-monitoring ~= 2.21.0
google-cloud-storage ~= 2.16.0
kombu ~= 5.3.7
pytz == 2024.1

Просмотреть файл

@ -343,6 +343,10 @@ python-dateutil==2.9.0.post0 \
--hash=sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3 \
--hash=sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427
# via google-cloud-bigquery
pytz==2024.1 \
--hash=sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812 \
--hash=sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319
# via -r base.in
rapidfuzz==3.9.4 \
--hash=sha256:005a02688a51c7d2451a2d41c79d737aa326ff54167211b78a383fc2aace2c2c \
--hash=sha256:015b5080b999404fe06ec2cb4f40b0be62f0710c926ab41e82dfbc28e80675b4 \

Просмотреть файл

@ -1,5 +1,4 @@
#!/usr/bin/env bash
script_dir=$(dirname "$(realpath "$0")")
if ! command -v uv &> /dev/null; then
cat <<EOF
@ -16,7 +15,7 @@ min_python_version="3.12"
pushd $script_dir
for requirement_in in *.in; do
uv pip compile "$requirement_in" --python-version $min_python_version --universal --generate-hashes --output-file "${requirement_in%.*}.txt"
uv pip compile "$requirement_in" --python-version $min_python_version --universal --generate-hashes --output-file "${requirement_in%.*}.txt" ${@}
done
popd

Просмотреть файл

@ -373,6 +373,10 @@ python-dateutil==2.9.0.post0 \
# via
# freezegun
# google-cloud-bigquery
pytz==2024.1 \
--hash=sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812 \
--hash=sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319
# via -r base.in
rapidfuzz==3.9.5 \
--hash=sha256:031806eb035a6f09f4ff23b9d971d50b30b5e93aa3ee620c920bee1dc32827e7 \
--hash=sha256:0d34b0e8e29f80cb2ac8afe8fb7b01a542b136ffbf7e2b9983d11bce49398f68 \

Просмотреть файл

@ -1,6 +1,9 @@
from datetime import datetime, timedelta
from math import floor
from unittest.mock import call
import pytest
import pytz
from freezegun import freeze_time
from google.cloud.exceptions import NotFound
from google.cloud.monitoring_v3 import Aggregation, ListTimeSeriesRequest, TimeInterval
@ -9,8 +12,12 @@ from google.protobuf.timestamp_pb2 import Timestamp
from fxci_etl.metric import export
@pytest.fixture(autouse=True)
def patch_gcp_clients(mocker):
mocker.patch.object(export, "storage", mocker.Mock())
mocker.patch.object(export, "MetricServiceClient", mocker.Mock())
@freeze_time("2024-08-01")
def test_metric_exporter_get_timeseries(mocker, make_config):
# constants
project = "proj"
@ -22,10 +29,6 @@ def test_metric_exporter_get_timeseries(mocker, make_config):
end_time=Timestamp(seconds=end_time),
)
# mock
mocker.patch.object(export, "storage", mocker.Mock())
mocker.patch.object(export, "MetricServiceClient", mocker.Mock())
# test
config = make_config()
exporter = export.MetricExporter(config)
@ -49,16 +52,14 @@ def test_metric_exporter_get_timeseries(mocker, make_config):
]
@freeze_time("2024-08-01")
@freeze_time("2024-08-01 04:00:00")
def test_metric_exporter_get_time_interval(mocker, make_config):
# constants
now = datetime.now()
prev_end_time = now - timedelta(hours=1)
# mock
mocker.patch.object(export, "storage", mocker.Mock())
mocker.patch.object(export, "MetricServiceClient", mocker.Mock())
utc = pytz.UTC
now = datetime.now(utc)
prev_end_time = now - timedelta(hours=12)
yesterday = now.date() - timedelta(days=1)
expected_end_time = utc.localize(datetime.combine(yesterday, datetime.max.time()))
config = make_config()
exporter = export.MetricExporter(config)
@ -70,19 +71,54 @@ def test_metric_exporter_get_time_interval(mocker, make_config):
result = exporter.get_time_interval()
assert isinstance(result, TimeInterval)
assert result.start_time.timestamp() == prev_end_time.timestamp() # type: ignore
assert result.end_time.timestamp() == (now - timedelta(minutes=10)).timestamp() # type: ignore
assert result.end_time.timestamp() == floor(expected_end_time.timestamp()) # type: ignore
@freeze_time("2024-08-01 04:00:00")
def test_metric_exporter_get_time_interval_no_prev_end_time(mocker, make_config):
# constants
utc = pytz.UTC
now = datetime.now(utc)
yesterday = now.date() - timedelta(days=1)
expected_start_time = utc.localize(datetime.combine(yesterday, datetime.min.time()))
expected_end_time = utc.localize(datetime.combine(yesterday, datetime.max.time()))
config = make_config()
exporter = export.MetricExporter(config)
# test last_end_time not found
exporter.last_export.download_as_string.side_effect = NotFound("") # type: ignore
result = exporter.get_time_interval()
assert isinstance(result, TimeInterval)
assert (
result.start_time.timestamp() # type: ignore
== (
now - timedelta(minutes=10) - timedelta(seconds=export.MINIMUM_INTERVAL)
).timestamp()
result.start_time.timestamp() == expected_start_time.timestamp() # type: ignore
)
assert result.end_time.timestamp() == (now - timedelta(minutes=10)).timestamp() # type: ignore
assert result.end_time.timestamp() == floor(expected_end_time.timestamp()) # type: ignore
@freeze_time("2024-08-01 00:05:00")
def test_metric_exporter_get_time_interval_too_close_to_midnight(make_config):
config = make_config()
exporter = export.MetricExporter(config)
with pytest.raises(Exception):
exporter.get_time_interval()
@freeze_time("2024-08-01 04:00:00")
def test_metric_exporter_get_time_interval_already_ran(make_config):
# constants
utc = pytz.UTC
now = datetime.now(utc)
prev_end_time = now - timedelta(hours=1)
# test
config = make_config()
exporter = export.MetricExporter(config)
exporter.last_export.download_as_string.return_value = ( # type: ignore
f'{{"end_time": {int(prev_end_time.timestamp())}}}'
)
with pytest.raises(Exception):
exporter.get_time_interval()
def test_export_metrics(mocker, make_config):