145 строки
5.2 KiB
145 строки
5.2 KiB
import os
from flask import Flask, request, jsonify
from flask.logging import default_handler
import json
import hashlib
import hmac
import logging
from datetime import datetime
import util
import jiralib
import ghlib
import threading
sync = None
repo_sync_interval = None
app = Flask(__name__)
sync_lock = threading.Lock()
last_repo_syncs = {}
secret = None
def run_server(sync_object, webhook_secret, repository_sync_interval=60 * 60 * 24, port=5000):
global sync
sync = sync_object
global secret
secret = webhook_secret.encode('utf-8')
global repo_sync_interval
repo_sync_interval = repository_sync_interval
def auth_is_valid(signature, request_body):
if app.debug:
return True
return hmac.compare_digest(
signature.encode('utf-8'), ('sha256=' + hmac.new(secret, request_body, hashlib.sha256).hexdigest()).encode('utf-8')
@app.route("/jira", methods=["POST"])
def jira_webhook():
"""Handle POST requests coming from JIRA, and pass a translated request to GitHub"""
if not hmac.compare_digest(request.args.get('secret_token', '').encode('utf-8'), secret):
return jsonify({"code": 403, "error": "Unauthorized"}), 403
payload = json.loads(request.data.decode('utf-8'))
event = payload['webhookEvent']
desc = payload['issue']['fields']['description']
repo_id, alert_id, _, _ = jiralib.parse_alert_info(desc)
app.logger.debug('Received JIRA webhook for event "{event}"'.format(event=event))
if repo_id is None:
app.logger.debug('Ignoring JIRA webhook for issue not related to a code scanning alert.')
return jsonify({}), 200
with sync_lock:
# we only care about updates to issues
if event == jiralib.CREATE_EVENT:
elif event == jiralib.DELETE_EVENT:
elif event == jiralib.UPDATE_EVENT:
app.logger.debug('Ignoring JIRA webhook for event "{event}".'.format(event=event))
return jsonify({}), 200
return jsonify({}), 200
@app.route("/github", methods=["POST"])
def github_webhook():
Handle POST requests coming from GitHub, and pass a translated request to JIRA
By default, flask runs in single-threaded mode, so we don't need to worry about
any race conditions.
app.logger.debug('Received GITHUB webhook for event "{event}"'.format(event=request.headers.get("X-GitHub-Event", "")))
if not auth_is_valid(request.headers.get("X-Hub-Signature-256", "not-provided"), request.data):
return jsonify({"code": 403, "error": "Unauthorized"}), 403
# When creating a webhook, GitHub will send a 'ping' to check whether the
# instance is up. If we return a friendly code, the hook will mark as green in the UI.
if request.headers.get("X-GitHub-Event", "") == "ping":
return jsonify({}), 200
json_dict = request.get_json()
repo_id = json_dict.get("repository", {}).get("full_name")
transition = json_dict.get("action")
if request.headers.get("X-GitHub-Event", "") == "repository":
if transition == 'deleted':
with sync_lock:
return jsonify({"code": 400, "error": "Wrong event type: " + request.headers.get("X-GitHub-Event", "")}), 400
if request.headers.get("X-GitHub-Event", "") != "code_scanning_alert":
return jsonify({"code": 400, "error": "Wrong event type: " + request.headers.get("X-GitHub-Event", "")}), 400
alert = json_dict.get("alert")
alert_url = alert.get("html_url")
alert_num = alert.get("number")
rule_id = alert.get("rule").get("id")
rule_desc = alert.get("rule").get("id")
# TODO: We might want to do the following asynchronously, as it could
# take time to do a full sync on a repo with many alerts / issues
last_sync = last_repo_syncs.get(repo_id, 0)
now = datetime.now().timestamp()
if now - last_sync >= repo_sync_interval:
last_repo_syncs[repo_id] = now
with sync_lock:
app.logger.debug('Received GITHUB webhook {action} for {alert_url}'.format(action=transition, alert_url=alert_url))
# we deal with each action type individually, showing the expected
# behaviour and response codes explicitly
with sync_lock:
if transition == "appeared_in_branch":
app.logger.debug('Nothing to do for "appeared_in_branch"')
elif transition == "created":
sync.alert_created(repo_id, alert_num)
elif transition in ["closed_by_user", "reopened_by_user", "reopened"]:
sync.alert_changed(repo_id, alert_num)
elif transition == "fixed":
sync.alert_fixed(repo_id, alert_num)
# when the transition is not recognised, we return a bad request response
return (
jsonify({"code": 400, "error": "unknown transition type - %s" % transition}),
return jsonify({}), 200