586 строки
19 KiB
Python
586 строки
19 KiB
Python
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import zipfile
|
|
import requests
|
|
import shutil
|
|
import threading
|
|
import time
|
|
import glob
|
|
|
|
try:
|
|
from urllib.parse import urlencode
|
|
from urllib.request import urlopen, urlretrieve
|
|
except ImportError:
|
|
from urllib import urlencode, urlretrieve
|
|
from urllib2 import urlopen
|
|
|
|
# Use this program to dowwnload, extract, and distribute artifact
|
|
# files that are to be used for the analyses.
|
|
|
|
# Use just the groupID, it absoutely needs to be given. With that, get the task details
|
|
# for the entire group, and find all the tests specified with the suite, chunk, and mode
|
|
# given through the parser arguments. For each of those tests, take the taskId
|
|
# and download the artifact data chunk. Continue suffixing them, however, store
|
|
# a json for a mapping from numbers to taskID's for future reference.
|
|
|
|
# The suite should include the flavor. It makes no sense to aggregate the data from
|
|
# multiple flavors together because they don't run the same tests. This is also
|
|
# why you cannot specify more than one suite and chunk.
|
|
def artifact_downloader_parser():
|
|
parser = argparse.ArgumentParser(
|
|
"This tool can download artifact data from a group of "
|
|
+ "taskcluster tasks. It then extracts the data, suffixes it with "
|
|
+ "a number and then stores it in an output directory."
|
|
)
|
|
parser.add_argument(
|
|
"--task-group-id",
|
|
type=str,
|
|
nargs=1,
|
|
help="The group of tasks that should be parsed to find all the necessary "
|
|
+ "data to be used in this analysis. ",
|
|
)
|
|
parser.add_argument(
|
|
"--test-suites-list",
|
|
type=str,
|
|
nargs="+",
|
|
help="The listt of tests to look at. e.g. mochitest-browser-chrome-e10s-2."
|
|
+ " If it`s empty we assume that it means nothing, if `all` is given all suites"
|
|
+ " will be processed.",
|
|
)
|
|
parser.add_argument(
|
|
"--artifact-to-get",
|
|
type=str,
|
|
nargs="+",
|
|
default=["grcov"],
|
|
help="Pattern matcher for the artifact you want to download. By default, it"
|
|
+ " is set to `grcov` to get ccov artifacts. Use `per_test_coverage` to get data"
|
|
+ " from test-coverage tasks.",
|
|
)
|
|
parser.add_argument(
|
|
"--unzip-artifact",
|
|
action="store_true",
|
|
default=False,
|
|
help="Set to False if you don`t want the artifact to be extracted.",
|
|
)
|
|
parser.add_argument(
|
|
"--platform",
|
|
type=str,
|
|
default="test-linux64-ccov",
|
|
help="Platform to obtain data from.",
|
|
)
|
|
parser.add_argument(
|
|
"--download-failures",
|
|
action="store_true",
|
|
default=False,
|
|
help="Set this flag to download data from failed tasks.",
|
|
)
|
|
parser.add_argument(
|
|
"--ingest-continue",
|
|
action="store_true",
|
|
default=False,
|
|
help="Continues from the same run it was doing before.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=str,
|
|
nargs=1,
|
|
help="This is the directory where all the download, extracted, and suffixed "
|
|
+ "data will reside.",
|
|
)
|
|
return parser
|
|
|
|
|
|
# Used to limit the number of concurrent data requests
|
|
START_TIME = time.time()
|
|
MAX_REQUESTS = 5
|
|
CURR_REQS = 0
|
|
RETRY = 5
|
|
TOTAL_TASKS = 0
|
|
CURR_TASK = 0
|
|
FAILED = []
|
|
ALL_TASKS = []
|
|
TC_PREFIX = "https://firefox-ci-tc.services.mozilla.com/api/queue/"
|
|
|
|
SECONDARYMETHOD = False
|
|
TC_PREFIX2 = "https://firefoxci.taskcluster-artifacts.net/"
|
|
|
|
|
|
def log(msg):
|
|
global CURR_TASK
|
|
global TOTAL_TASKS
|
|
elapsed_time = time.time() - START_TIME
|
|
val = time.strftime("%H:%M:%S", time.gmtime(elapsed_time))
|
|
pc = round((CURR_TASK / TOTAL_TASKS) * 100, 1) if TOTAL_TASKS else 0
|
|
print(
|
|
"[%s][INFO] %s/%s %s - %s"
|
|
% (val, str(CURR_TASK + 1), str(TOTAL_TASKS), pc, msg)
|
|
)
|
|
|
|
|
|
def warning(msg):
|
|
global CURR_TASK
|
|
global TOTAL_TASKS
|
|
elapsed_time = time.time() - start_time
|
|
val = time.strftime("%H:%M:%S", time.gmtime(elapsed_time))
|
|
pc = round((CURR_TASK / TOTAL_TASKS) * 100, 1) if TOTAL_TASKS else 0
|
|
print(
|
|
"[%s][WARNING] %s/%s %s - %s"
|
|
% (val, str(CURR_TASK + 1), str(TOTAL_TASKS), pc, msg)
|
|
)
|
|
|
|
|
|
def get_json(url, params=None):
|
|
if params is not None:
|
|
url += "?" + urlencode(params)
|
|
|
|
r = urlopen(url).read().decode("utf-8")
|
|
|
|
return json.loads(r)
|
|
|
|
|
|
def get_task_details(task_id):
|
|
task_details = get_json(TC_PREFIX + "v1/task/" + task_id)
|
|
return task_details
|
|
|
|
|
|
def get_task_artifacts(task_id):
|
|
artifacts = get_json(TC_PREFIX + "v1/task/" + task_id + "/artifacts")
|
|
return artifacts["artifacts"]
|
|
|
|
|
|
def get_tasks_in_group(group_id):
|
|
reply = get_json(
|
|
TC_PREFIX + "v1/task-group/" + group_id + "/list", {"limit": "200"}
|
|
)
|
|
tasks = reply["tasks"]
|
|
while "continuationToken" in reply:
|
|
reply = get_json(
|
|
TC_PREFIX + "v1/task-group/" + group_id + "/list",
|
|
{"limit": "200", "continuationToken": reply["continuationToken"]},
|
|
)
|
|
tasks += reply["tasks"]
|
|
return tasks
|
|
|
|
|
|
def download_artifact(task_id, artifact, output_dir):
|
|
global FAILED
|
|
|
|
fname = os.path.join(output_dir, task_id + "|" + os.path.basename(artifact["name"]))
|
|
log("Downloading " + artifact["name"] + " to: " + fname)
|
|
if os.path.exists(fname):
|
|
log("File already exists.")
|
|
return fname
|
|
|
|
tries = 0
|
|
if not SECONDARYMETHOD:
|
|
url_data = TC_PREFIX + "v1/task/" + task_id + "/artifacts/" + artifact["name"]
|
|
else:
|
|
url_data = TC_PREFIX2 + task_id + "/0/" + artifact["name"]
|
|
|
|
while tries < RETRY:
|
|
try:
|
|
# Make the actual request
|
|
request = requests.get(url_data, timeout=60, stream=True)
|
|
|
|
# Open the output file and make sure we write in binary mode
|
|
with open(fname, "wb") as fh:
|
|
# Walk through the request response in chunks of 1024 * 1024 bytes, so 1MiB
|
|
for chunk in request.iter_content(1024 * 1024):
|
|
# Write the chunk to the file
|
|
fh.write(chunk)
|
|
break
|
|
except Exception as e:
|
|
log(
|
|
"Failed to get data from %s: %s - %s"
|
|
% (url_data, e.__class__.__name__, e)
|
|
)
|
|
if tries < RETRY:
|
|
tries += 1
|
|
log("Retrying %s more times..." % str(RETRY - tries))
|
|
else:
|
|
warning("No more retries. Failed to download %s" % url)
|
|
FAILED.append(task_id)
|
|
raise
|
|
|
|
# urlretrieve(
|
|
# 'https://queue.taskcluster.net/v1/task/' + task_id + '/artifacts/' + artifact['name'],
|
|
# fname
|
|
# )
|
|
return fname
|
|
|
|
|
|
def suite_name_from_task_name(name):
|
|
psn = name.split("/")[-1]
|
|
psn = "-".join(psn.split("-")[1:])
|
|
return psn
|
|
|
|
|
|
def make_count_dir(a_path):
|
|
os.makedirs(a_path, exist_ok=True)
|
|
return a_path
|
|
|
|
|
|
def extract_tgz(tar_url, extract_path="."):
|
|
import tarfile
|
|
|
|
tar = tarfile.open(tar_url, "r")
|
|
for item in tar:
|
|
tar.extract(item, extract_path)
|
|
if item.name.find(".tgz") != -1 or item.name.find(".tar") != -1:
|
|
extract(item.name, "./" + item.name[: item.name.rfind("/")])
|
|
|
|
|
|
def unzip_file(abs_zip_path, output_dir, count=0):
|
|
tmp_path = ""
|
|
tmp_path = os.path.join(output_dir, str(count))
|
|
if not os.path.exists(tmp_path):
|
|
make_count_dir(tmp_path)
|
|
if abs_zip_path.endswith(".zip"):
|
|
with zipfile.ZipFile(abs_zip_path, "r") as z:
|
|
z.extractall(tmp_path)
|
|
else:
|
|
task_id = os.path.split(abs_zip_path)[1].split("|")[0]
|
|
extract_tgz(abs_zip_path, tmp_path)
|
|
os.rename(
|
|
os.path.join(tmp_path, "browsertime-results"),
|
|
os.path.join(tmp_path, task_id + "|browsertime-results"),
|
|
)
|
|
return tmp_path
|
|
|
|
|
|
def move_file(abs_filepath, output_dir, count=0):
|
|
tmp_path = os.path.join(output_dir, str(count))
|
|
_, fname = os.path.split(abs_filepath)
|
|
if not os.path.exists(tmp_path):
|
|
make_count_dir(tmp_path)
|
|
if os.path.exists(os.path.join(tmp_path, fname)):
|
|
return
|
|
|
|
shutil.copyfile(abs_filepath, os.path.join(tmp_path, fname))
|
|
return tmp_path
|
|
|
|
|
|
def artifact_downloader(
|
|
task_group_id,
|
|
output_dir=os.getcwd(),
|
|
test_suites=[],
|
|
download_failures=False,
|
|
artifact_to_get="grcov",
|
|
unzip_artifact=True,
|
|
platform="test-linux64-ccov",
|
|
ingest_continue=False,
|
|
):
|
|
global CURR_REQS
|
|
global CURR_TASK
|
|
global TOTAL_TASKS
|
|
global FAILED
|
|
global ALL_TASKS
|
|
|
|
head_rev = ""
|
|
all_tasks = False
|
|
if "all" in test_suites:
|
|
all_tasks = True
|
|
|
|
# For compatibility
|
|
if type(artifact_to_get) not in (list,):
|
|
artifact_to_get = [artifact_to_get]
|
|
|
|
# Make the data directories
|
|
task_dir = os.path.join(output_dir, task_group_id)
|
|
|
|
run_number = 0
|
|
max_num = 0
|
|
if not os.path.exists(task_dir):
|
|
os.makedirs(task_dir, exist_ok=True)
|
|
else:
|
|
# Get current run number
|
|
curr_dir = os.getcwd()
|
|
os.chdir(task_dir)
|
|
dir_list = next(os.walk("."))[1]
|
|
max_num = 0
|
|
for subdir in dir_list:
|
|
run_num = int(subdir)
|
|
if run_num > max_num:
|
|
max_num = run_num
|
|
os.chdir(curr_dir)
|
|
|
|
if not ingest_continue:
|
|
run_number = max_num + 1
|
|
|
|
output_dir = os.path.join(task_dir, str(run_number))
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
log("Artifacts will be stored in %s" % output_dir)
|
|
config_json_path = os.path.join(output_dir, "config.json")
|
|
with open(config_json_path, "w") as f:
|
|
json.dump(
|
|
{
|
|
"test_suites": test_suites,
|
|
"platform": platform,
|
|
"artifact": artifact_to_get,
|
|
"download_failures": download_failures,
|
|
"task_group_id": task_group_id,
|
|
},
|
|
f,
|
|
indent=4,
|
|
)
|
|
|
|
log("Saved run configuration to %s" % config_json_path)
|
|
|
|
task_ids = []
|
|
log("Getting task group information...")
|
|
tgi_path = os.path.join(task_dir, "task-group-information.json")
|
|
if os.path.exists(tgi_path):
|
|
with open(tgi_path, "r") as f:
|
|
tasks = json.load(f)
|
|
else:
|
|
tasks = get_tasks_in_group(task_group_id)
|
|
with open(tgi_path, "w") as f:
|
|
json.dump(tasks, f, indent=4)
|
|
log("Obtained")
|
|
|
|
# Used to keep track of how many grcov files
|
|
# we are downloading per test.
|
|
task_counters = {}
|
|
taskid_to_file_map = {}
|
|
|
|
# For each task in this group
|
|
threads = []
|
|
TOTAL_TASKS = len(tasks)
|
|
for task in tasks:
|
|
download_this_task = False
|
|
# Get the test name
|
|
if platform not in task["task"]["metadata"]["name"]:
|
|
continue
|
|
test_name = suite_name_from_task_name(task["task"]["metadata"]["name"])
|
|
log(
|
|
"Found %s with suite-name: %s"
|
|
% (task["task"]["metadata"]["name"], test_name)
|
|
)
|
|
|
|
if (
|
|
task.get("status", {}).get("state", "") in ("failed",)
|
|
and not download_failures
|
|
):
|
|
log("Skipped failed task")
|
|
continue
|
|
|
|
# If all tests weren't asked for but this test is
|
|
# asked for, set the flag.
|
|
if (not all_tasks) and test_name in test_suites:
|
|
download_this_task = True
|
|
|
|
if all_tasks or download_this_task:
|
|
if "GECKO_HEAD_REV" in task["task"]["payload"]["env"]:
|
|
# Some tasks are missing this variable
|
|
head_rev = task["task"]["payload"]["env"]["GECKO_HEAD_REV"]
|
|
|
|
# Make directories for this task
|
|
grcov_dir = os.path.join(output_dir, test_name)
|
|
downloads_dir = os.path.join(os.path.join(grcov_dir, "downloads"))
|
|
data_dir = {
|
|
aname: os.path.join(
|
|
os.path.join(grcov_dir, (aname.replace(".", "")) + "_data")
|
|
)
|
|
for aname in artifact_to_get
|
|
}
|
|
|
|
if test_name not in task_counters:
|
|
os.makedirs(grcov_dir, exist_ok=True)
|
|
os.makedirs(downloads_dir, exist_ok=True)
|
|
for _, p in data_dir.items():
|
|
os.makedirs(p, exist_ok=True)
|
|
task_counters[test_name] = 0
|
|
else:
|
|
task_counters[test_name] += 1
|
|
task_id = task["status"]["taskId"]
|
|
ALL_TASKS.append(task_id)
|
|
|
|
def get_artifacts(
|
|
task_id,
|
|
downloads_dir,
|
|
data_dir,
|
|
unzip_artifact,
|
|
test_counter,
|
|
test_name,
|
|
artifact_to_get,
|
|
download_failures,
|
|
taskid_to_file_map,
|
|
):
|
|
global CURR_REQS
|
|
|
|
try:
|
|
|
|
def _pattern_match(name, artifacts_to_get):
|
|
for aname in artifacts_to_get:
|
|
if aname in name:
|
|
return aname
|
|
return None
|
|
|
|
def _check_unzip(filen):
|
|
return unzip_artifact and (
|
|
filen.endswith(".zip") or filen.endswith(".tgz")
|
|
)
|
|
|
|
files = os.listdir(downloads_dir)
|
|
ffound = [
|
|
f
|
|
for f in files
|
|
if _pattern_match(f, artifact_to_get) and task_id in f
|
|
]
|
|
if ffound:
|
|
log("File already exists.")
|
|
CURR_REQS -= 1
|
|
# There should only be file found
|
|
filen = ffound[0]
|
|
aname = _pattern_match(filen, artifact_to_get)
|
|
|
|
if aname == "grcov" or "grcov" in aname or _check_unzip(filen):
|
|
unzip_file(filen, data_dir[aname], test_counter)
|
|
else:
|
|
move_file(filen, data_dir[aname], test_counter)
|
|
|
|
taskid_to_file_map[task_id] = os.path.join(
|
|
data_dir[aname], str(test_counter)
|
|
)
|
|
|
|
return filen
|
|
|
|
CURR_REQS += 1
|
|
log("Getting task artifacts for %s" % task_id)
|
|
artifacts = get_task_artifacts(task_id)
|
|
CURR_REQS -= 1
|
|
|
|
# Check if the artifact to get exists before checking for
|
|
# failures in the task
|
|
exists = False
|
|
for artifact in artifacts:
|
|
if _pattern_match(artifact["name"], artifact_to_get):
|
|
exists = True
|
|
if not exists:
|
|
log("Missing %s in %s" % (artifact_to_get, task_id))
|
|
CURR_REQS -= 1
|
|
return
|
|
|
|
if not download_failures:
|
|
log("Checking for failures on %s" % task_id)
|
|
failed = None
|
|
for artifact in artifacts:
|
|
if "log_error" in artifact["name"]:
|
|
CURR_REQS += 1
|
|
filen = download_artifact(
|
|
task_id, artifact, downloads_dir
|
|
)
|
|
CURR_REQS -= 1
|
|
if os.stat(filen).st_size != 0:
|
|
failed = artifact["name"]
|
|
if failed:
|
|
log("Skipping a failed test: " + failed)
|
|
return
|
|
|
|
for artifact in artifacts:
|
|
aname = _pattern_match(artifact["name"], artifact_to_get)
|
|
if aname:
|
|
filen = download_artifact(task_id, artifact, downloads_dir)
|
|
CURR_REQS -= 1
|
|
|
|
if aname == "grcov" or _check_unzip(filen):
|
|
unzip_file(filen, data_dir[aname], test_counter)
|
|
else:
|
|
move_file(filen, data_dir[aname], test_counter)
|
|
taskid_to_file_map[task_id] = os.path.join(
|
|
data_dir[aname], str(test_counter)
|
|
)
|
|
log("Finished %s for %s" % (task_id, test_name))
|
|
except Exception as e:
|
|
log("Failed to get artifacts from %s: %s" % (task_id, str(e)))
|
|
CURR_REQS -= 1
|
|
return
|
|
|
|
CURR_REQS += 1
|
|
log(artifact_to_get)
|
|
t = threading.Thread(
|
|
target=get_artifacts,
|
|
args=(
|
|
task_id,
|
|
downloads_dir,
|
|
data_dir,
|
|
unzip_artifact,
|
|
task_counters[test_name],
|
|
test_name,
|
|
artifact_to_get,
|
|
download_failures,
|
|
taskid_to_file_map,
|
|
),
|
|
)
|
|
t.daemon = True
|
|
|
|
t.start()
|
|
threads.append(t)
|
|
|
|
start = time.time()
|
|
while CURR_REQS >= MAX_REQUESTS and time.time() - start < 60:
|
|
time.sleep(1)
|
|
log("Waiting for requests to finish, currently at %s" % str(CURR_REQS))
|
|
if time.time() - start > 60:
|
|
CURR_REQS = 0
|
|
|
|
CURR_TASK += 1
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
with open(os.path.join(output_dir, "taskid_to_file_map.json"), "w") as f:
|
|
json.dump(taskid_to_file_map, f, indent=4)
|
|
|
|
log("Finished processing.")
|
|
log(
|
|
"Stats: %s PASSED, %s FAILED, %s TOTAL"
|
|
% (str(len(ALL_TASKS) - len(FAILED)), str(len(FAILED)), str(len(ALL_TASKS)))
|
|
)
|
|
if FAILED:
|
|
log(
|
|
"Tasks the failed to have their artifact downloaded: %s"
|
|
% "\n\t".join(FAILED)
|
|
)
|
|
|
|
# Return the directory where all the tasks were downloaded to
|
|
# and split into folders.
|
|
return output_dir, head_rev
|
|
|
|
|
|
def main():
|
|
parser = artifact_downloader_parser()
|
|
args = parser.parse_args()
|
|
|
|
task_group_id = args.task_group_id[0]
|
|
test_suites = args.test_suites_list
|
|
artifact_to_get = args.artifact_to_get
|
|
unzip_artifact = args.unzip_artifact
|
|
platform = args.platform
|
|
download_failures = args.download_failures
|
|
ingest_continue = args.ingest_continue
|
|
output_dir = args.output[0] if args.output is not None else os.getcwd()
|
|
|
|
task_dir, head_rev = artifact_downloader(
|
|
task_group_id,
|
|
output_dir=output_dir,
|
|
test_suites=test_suites,
|
|
artifact_to_get=artifact_to_get,
|
|
unzip_artifact=unzip_artifact,
|
|
platform=platform,
|
|
download_failures=download_failures,
|
|
ingest_continue=ingest_continue,
|
|
)
|
|
|
|
return task_dir
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|