зеркало из
1
0
Форкнуть 0
ghas-jira-integration/ghlib.py

359 строки
9.9 KiB
Python

import requests
import logging
import json
import util
from requests import HTTPError
WEBHOOK_CONFIG = """
{
"url": "{url}",
"content_type": "{content_type}",
"secret": "{secret}",
"insecure_ssl": "{insecure_ssl}",
"events": "{envents}",
"active": "{active}"
}
"""
RESULTS_PER_PAGE = 100
logger = logging.getLogger(__name__)
class GitHub:
def __init__(self, url, token):
self.url = url
self.token = token
def default_headers(self):
auth = {"Authorization": "token " + self.token}
auth.update(util.json_accept_header())
return auth
def getRepository(self, repo_id):
return GHRepository(self, repo_id)
def list_org_hooks(self, org):
"""requires a token with "admin:org_hook" permission!"""
return self.list_hooks_helper(org)
def list_hooks_helper(self, entity):
if "/" in entity:
etype = "repos"
else:
etype = "orgs"
resp = requests.get(
"{api_url}/{etype}/{ename}/hooks?per_page={results_per_page}".format(
api_url=self.url,
etype=etype,
ename=entity,
results_per_page=RESULTS_PER_PAGE,
),
headers=self.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
while True:
resp.raise_for_status()
for h in resp.json():
yield h
nextpage = resp.links.get("next", {}).get("url", None)
if not nextpage:
break
resp = requests.get(
nextpage, headers=self.default_headers(), timeout=util.REQUEST_TIMEOUT
)
def create_org_hook(
self,
org,
url,
secret,
active=True,
events=["code_scanning_alert", "repository"],
insecure_ssl="0",
content_type="json",
):
return self.create_hook_helper(
org, url, secret, active, events, insecure_ssl, content_type
)
def create_hook_helper(
self,
entity,
url,
secret,
active=True,
events=["code_scanning_alert", "repository"],
insecure_ssl="0",
content_type="json",
):
if "/" in entity:
etype = "repos"
else:
etype = "orgs"
data = json.dumps(
{
"config": {
"url": url,
"insecure_ssl": insecure_ssl,
"secret": secret,
"content_type": content_type,
},
"events": events,
"active": active,
"name": "web",
}
)
resp = requests.post(
"{api_url}/{etype}/{ename}/hooks".format(
etype=etype, ename=entity, api_url=self.url
),
headers=self.default_headers(),
data=data,
timeout=util.REQUEST_TIMEOUT,
)
resp.raise_for_status()
return resp.json()
class GHRepository:
def __init__(self, github, repo_id):
self.gh = github
self.repo_id = repo_id
def list_hooks(self):
return self.gh.list_hooks_helper(self.repo_id)
def create_hook(
self,
url,
secret,
active=True,
events=["code_scanning_alert", "repository"],
insecure_ssl="0",
content_type="json",
):
return self.gh.create_hook_helper(
self.repo_id, url, secret, active, events, insecure_ssl, content_type
)
def get_key(self):
return util.make_key(self.repo_id)
def alerts_helper(self, api_segment, state=None):
if state:
state = "&state=" + state
else:
state = ""
try:
resp = requests.get(
"{api_url}/repos/{repo_id}/{api_segment}/alerts?per_page={results_per_page}{state}".format(
api_url=self.gh.url,
repo_id=self.repo_id,
api_segment=api_segment,
state=state,
results_per_page=RESULTS_PER_PAGE,
),
headers=self.gh.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
while True:
resp.raise_for_status()
for a in resp.json():
yield a
nextpage = resp.links.get("next", {}).get("url", None)
if not nextpage:
break
resp = requests.get(
nextpage,
headers=self.gh.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
except HTTPError as httpe:
if httpe.response.status_code == 404:
# A 404 suggests that the repository doesn't exist
# so we return an empty list
pass
else:
# propagate everything else
raise
def get_info(self):
resp = requests.get(
"{api_url}/repos/{repo_id}".format(
api_url=self.gh.url, repo_id=self.repo_id
),
headers=self.gh.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
resp.raise_for_status()
return resp.json()
def isprivate(self):
return self.get_info()["private"]
def get_alerts(self, state=None):
for a in self.alerts_helper("code-scanning", state):
yield Alert(self, a)
def get_secrets(self, state=None):
# secret scanning alerts are only accessible on private repositories, so
# we return an empty list on public ones
if not self.isprivate():
return
for a in self.alerts_helper("secret-scanning", state):
yield Secret(self, a)
def get_alert(self, alert_num):
resp = requests.get(
"{api_url}/repos/{repo_id}/code-scanning/alerts/{alert_num}".format(
api_url=self.gh.url, repo_id=self.repo_id, alert_num=alert_num
),
headers=self.gh.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
try:
resp.raise_for_status()
return Alert(self, resp.json())
except HTTPError as httpe:
if httpe.response.status_code == 404:
# A 404 suggests that the alert doesn't exist
return None
else:
# propagate everything else
raise
class AlertBase:
def __init__(self, github_repo, json):
self.github_repo = github_repo
self.gh = github_repo.gh
self.json = json
def get_state(self):
return self.json["state"] == "open"
def get_type(self):
return type(self).__name__
def number(self):
return int(self.json["number"])
def short_desc(self):
raise NotImplementedError
def long_desc(self):
raise NotImplementedError
def hyperlink(self):
return self.json["html_url"]
def can_transition(self):
return True
def get_key(self):
raise NotImplementedError
def adjust_state(self, target_state):
if self.get_state() == target_state:
return
logger.info(
'{action} {atype} {alert_num} of repository "{repo_id}".'.format(
atype=self.get_type(),
action="Reopening" if target_state else "Closing",
alert_num=self.number(),
repo_id=self.github_repo.repo_id,
)
)
self.do_adjust_state(target_state)
class Alert(AlertBase):
def __init__(self, github_repo, json):
AlertBase.__init__(self, github_repo, json)
def can_transition(self):
return self.json["state"] != "fixed"
def long_desc(self):
return self.json["rule"]["description"]
def short_desc(self):
return self.json["rule"]["id"]
def get_key(self):
return util.make_key(self.github_repo.repo_id + "/" + str(self.number()))
def do_adjust_state(self, target_state):
state = "open"
reason = ""
if not target_state:
state = "dismissed"
reason = ', "dismissed_reason": "won\'t fix"'
data = '{{"state": "{state}"{reason}}}'.format(state=state, reason=reason)
resp = requests.patch(
"{api_url}/repos/{repo_id}/code-scanning/alerts/{alert_num}".format(
api_url=self.gh.url,
repo_id=self.github_repo.repo_id,
alert_num=self.number(),
),
data=data,
headers=self.gh.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
resp.raise_for_status()
class Secret(AlertBase):
def __init__(self, github_repo, json):
AlertBase.__init__(self, github_repo, json)
def can_transition(self):
return True
def long_desc(self):
return self.json["secret_type"]
def short_desc(self):
return self.long_desc()
def get_key(self):
return util.make_key(
self.github_repo.repo_id + "/" + self.get_type() + "/" + str(self.number())
)
def do_adjust_state(self, target_state):
state = "open"
resolution = ""
if not target_state:
state = "resolved"
resolution = ', "resolution": "wont_fix"'
data = '{{"state": "{state}"{resolution}}}'.format(
state=state, resolution=resolution
)
resp = requests.patch(
"{api_url}/repos/{repo_id}/secret-scanning/alerts/{alert_num}".format(
api_url=self.gh.url,
repo_id=self.github_repo.repo_id,
alert_num=self.number(),
),
data=data,
headers=self.gh.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
resp.raise_for_status()