[dev dap collector] support more time precision settings on tasks (#234)

* support more time precision settings on tasks

* make the same separate functions for checking time precision and date validity

* review feedback
This commit is contained in:
Daniel Mueller 2024-07-16 10:42:34 -04:00 коммит произвёл GitHub
Родитель ef4e640ab9
Коммит 57eda920a5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
1 изменённых файлов: 76 добавлений и 11 удалений

Просмотреть файл

@ -9,9 +9,7 @@ import requests
LEADER = "https://dap-09-3.api.divviup.org"
CMD = f"./collect --task-id {{task_id}} --leader {LEADER} --vdaf {{vdaf}} {{vdaf_args}} --authorization-bearer-token {{auth_token}} --batch-interval-start {{timestamp}} --batch-interval-duration {{duration}} --hpke-config {{hpke_config}} --hpke-private-key {{hpke_private_key}}"
INTERVAL_LENGTH = 300
JOB_INTERVAL = 15
TASK_AD_SIZE = 5
MINUTES_IN_DAY = 1440
ADS_SCHEMA = [
bigquery.SchemaField("collection_time", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("placement_id", "STRING", mode="REQUIRED"),
@ -165,6 +163,22 @@ def build_base_report(task_id, timestamp, metric_type, collection_time):
return row
def build_error_result(task_id, timestamp, metric_type, error):
collection_time = str(datetime.datetime.now(datetime.timezone.utc).timestamp())
results = {}
results["counts"] = []
results["reports"] = []
rpt = build_base_report(task_id, timestamp, metric_type, collection_time)
rpt["collection_duration"] = 0
rpt["error"] = error
results["reports"].append(rpt)
return results
def get_ad(task_id, index):
global ads
for ad in ads:
@ -207,18 +221,69 @@ async def collect_many(
return results
def check_collection_date(date):
# collector should collect through to the beginning of a day
if date.hour != 0 or date.minute != 0 or date.second != 0:
return f"Collection date is not at beginning of a day {date}"
else:
return None
def check_time_precision(time_precision_minutes, end_collection_date):
"""Check that a given time precision is valid for the collection date
"""
end_collection_date_seconds = int(end_collection_date.timestamp())
if time_precision_minutes is None:
# task is missing a time precision setting
return f"Task missing time time_precision_minutes value"
elif time_precision_minutes < MINUTES_IN_DAY:
if MINUTES_IN_DAY % time_precision_minutes > 0:
# time precision has to evenly divide a day in order for this collector code to query all aggregations
return f"Task has time precision that does not evenly divide a day"
elif time_precision_minutes % MINUTES_IN_DAY != 0:
# time precision is a day or longer, but is not a multiple of a day
return f"Task has time precision that is not an even multiple of a day"
elif end_collection_date_seconds % (time_precision_minutes*60) != 0:
# time precision is a multiple of day, but the end does not align with this task's buckets
return f"{end_collection_date} does not align with task aggregation buckets"
return None
async def collect_task(task, auth_token, hpke_private_key, date):
"""Collects data for the given task and the given day."""
start = datetime.datetime.fromisoformat(date)
start = start.replace(tzinfo=datetime.timezone.utc)
end = start + datetime.timedelta(minutes=JOB_INTERVAL)
"""Collects data for the given task through to the given day.
For tasks with time precision smaller than a day, will collect data for aggregations from the day prior to date.
For tasks with time precision a day or multiple of day, will collect data for the aggregation that ends on date.
If date does not align with the end of an aggregation, it will not collect anything.
"""
end_collection_date = datetime.datetime.fromisoformat(date)
end_collection_date = end_collection_date.replace(tzinfo=datetime.timezone.utc)
time_precision_minutes = task["time_precision_minutes"]
results = await collect_many(
task, start, end, INTERVAL_LENGTH, hpke_private_key, auth_token
err = check_collection_date(end_collection_date)
if err is not None:
return build_error_result(task["task_id"], end_collection_date, task["metric_type"], err)
err = check_time_precision(time_precision_minutes, end_collection_date)
if err is not None:
return build_error_result(task["task_id"], end_collection_date, task["metric_type"], err)
# task precision and date are valid
if time_precision_minutes < MINUTES_IN_DAY:
# time precision is shorter than daily
# query for the last day of aggregations
start_collection_date = end_collection_date - datetime.timedelta(days=1)
else:
# time precision is a multiple of a day
# query for the aggregation that ends at end_collection_date
aggregation_days = time_precision_minutes/MINUTES_IN_DAY
start_collection_date = end_collection_date - datetime.timedelta(days=aggregation_days)
return await collect_many(
task, start_collection_date, end_collection_date, time_precision_minutes * 60, hpke_private_key, auth_token
)
return results
def ensure_table(bqclient, table_id, schema):
"""Checks if the table exists in BQ and creates it otherwise.