зеркало из https://github.com/mozilla/docker-etl.git
Miscellaneous fixes for the firefox-ci export job (#230)
* fix(fxci): make bigquery table names configurable The tables are defined and versioned in bigquery-etl. Make the names configurable so we don't need to rebuild the image when they change. * fix(fxci): don't assume the last run is the one that generated the pulse event While testing, I discovered that it's possible for a pulse event to contain runs later than the one that generated the event. So it's not safe to simply grab the last run in the list. Instead we should use the run from the index in the top-level `runId` field. * fix(fxci): skip tasks that hit 'deadline-exceeded'
This commit is contained in:
Родитель
7a9d942896
Коммит
2d39b8b0c2
|
@ -35,10 +35,18 @@ class PulseConfig:
|
|||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BigQueryTableConfig:
|
||||
metrics: str = "worker_metrics_v1"
|
||||
tasks: str = "tasks_v1"
|
||||
runs: str = "task_runs_v1"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BigQueryConfig:
|
||||
project: str
|
||||
dataset: str
|
||||
tables: BigQueryTableConfig = BigQueryTableConfig()
|
||||
credentials: Optional[str] = None
|
||||
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from abc import ABC, abstractmethod
|
||||
import base64
|
||||
from collections import defaultdict
|
||||
from dataclasses import asdict, dataclass
|
||||
from dataclasses import InitVar, asdict, dataclass
|
||||
import json
|
||||
import os
|
||||
from pprint import pprint
|
||||
|
@ -17,13 +17,15 @@ from fxci_etl.config import Config
|
|||
|
||||
@dataclass
|
||||
class Record(ABC):
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, Any]) -> "Record":
|
||||
return dacite.from_dict(data_class=cls, data=data)
|
||||
table_name: InitVar[str]
|
||||
|
||||
def __post_init__(self, table_name):
|
||||
self.table = table_name
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def table_name(cls) -> str: ...
|
||||
def from_dict(cls, table_name: str, data: dict[str, Any]) -> "Record":
|
||||
data["table_name"] = table_name
|
||||
return dacite.from_dict(data_class=cls, data=data)
|
||||
|
||||
@abstractmethod
|
||||
def __str__(self) -> str: ...
|
||||
|
@ -72,7 +74,7 @@ class BigQueryLoader:
|
|||
|
||||
tables = defaultdict(list)
|
||||
for record in records:
|
||||
tables[record.table_name()].append(record)
|
||||
tables[record.table].append(record)
|
||||
|
||||
failed_records = []
|
||||
for name, rows in tables.items():
|
||||
|
|
|
@ -32,10 +32,6 @@ class WorkerUptime(Record):
|
|||
interval_start_time: float
|
||||
interval_end_time: float
|
||||
|
||||
@classmethod
|
||||
def table_name(cls):
|
||||
return "worker_uptime"
|
||||
|
||||
def __str__(self):
|
||||
return f"worker {self.instance_id}"
|
||||
|
||||
|
@ -130,6 +126,7 @@ def export_metrics(config: Config, dry_run: bool = False) -> int:
|
|||
|
||||
records.append(
|
||||
WorkerUptime.from_dict(
|
||||
config.bigquery.tables.metrics,
|
||||
{
|
||||
"project": ts.resource.labels["project_id"],
|
||||
"zone": ts.resource.labels["zone"],
|
||||
|
|
|
@ -91,8 +91,14 @@ class BigQueryHandler(PulseHandler):
|
|||
|
||||
def process_event(self, event):
|
||||
data = event.data
|
||||
|
||||
if data.get("runId") is None:
|
||||
# This can happen if `deadline` was exceeded before a run could
|
||||
# start. Ignore this case.
|
||||
return
|
||||
|
||||
status = data["status"]
|
||||
run = data["status"]["runs"][-1]
|
||||
run = data["status"]["runs"][data["runId"]]
|
||||
run_record = {
|
||||
"task_id": status["taskId"],
|
||||
"reason_created": run["reasonCreated"],
|
||||
|
@ -111,7 +117,9 @@ class BigQueryHandler(PulseHandler):
|
|||
if "workerId" in run:
|
||||
run_record["worker_id"] = run["workerId"]
|
||||
|
||||
self.records.append(Run.from_dict(run_record))
|
||||
self.records.append(
|
||||
Run.from_dict(self.config.bigquery.tables.runs, run_record)
|
||||
)
|
||||
|
||||
if data["runId"] == 0:
|
||||
# Only insert the task record for run 0 to avoid duplicate records.
|
||||
|
@ -128,7 +136,9 @@ class BigQueryHandler(PulseHandler):
|
|||
task_record["tags"] = [
|
||||
{"key": k, "value": v} for k, v in data["task"]["tags"].items()
|
||||
]
|
||||
self.records.append(Task.from_dict(task_record))
|
||||
self.records.append(
|
||||
Task.from_dict(self.config.bigquery.tables.tasks, task_record)
|
||||
)
|
||||
except Exception:
|
||||
# Don't insert the run without its corresponding task.
|
||||
self.records.pop()
|
||||
|
|
|
@ -16,10 +16,6 @@ class Run(Record):
|
|||
worker_group: str
|
||||
worker_id: str
|
||||
|
||||
@classmethod
|
||||
def table_name(cls):
|
||||
return "task_runs"
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.task_id} run {self.run_id}"
|
||||
|
||||
|
@ -38,9 +34,5 @@ class Task(Record):
|
|||
task_queue_id: str
|
||||
tags: list[Tag]
|
||||
|
||||
@classmethod
|
||||
def table_name(cls):
|
||||
return "tasks"
|
||||
|
||||
def __str__(self):
|
||||
return self.task_id
|
||||
|
|
Загрузка…
Ссылка в новой задаче