Add an endpoint for activity stream csp report

This commit is contained in:
Nan Jiang 2017-07-28 17:32:32 -04:00
Родитель ed4ddfa41b
Коммит 795c23c14d
2 изменённых файлов: 74 добавлений и 1 удалений

Просмотреть файл

@ -1,4 +1,11 @@
from flask import Blueprint
import ujson
from flask import (
Blueprint,
request,
Response,
)
from onyx.environment import Environment
from onyx.api.v3 import handle_activity_stream_ping
@ -14,5 +21,45 @@ def activity_stream():
return handle_activity_stream_ping("activity_stream", "activity_stream")
@links.route('/activity-stream/csp', methods=['POST'])
@env.statsd.timer('v4_links_activity_stream_csp')
def activity_stream_csp():
"""Log activity stream CSP report sent from Firefox on CSP violations"""
ip_addr = None
ua = None
action = None
try:
client_payload_raw = request.get_data(cache=False)
client_payload = ujson.decode(client_payload_raw)
action = "activity_stream_csp"
# treat it as malformed payload if "csp-report" is missing
# see https://developers.google.com/web/fundamentals/security/csp/#reporting
client_payload = client_payload["csp-report"]
client_payload["action"] = action
ip_addr = request.headers.get('X-Forwarded-For')
if ip_addr is None:
ip_addr = request.remote_addr
ua = request.headers.get('User-Agent')
client_payload["ua"] = ua
client_payload["ip"] = ip_addr
except Exception:
env.log_dict(name="client_error", action="activity_stream_csp_malformed_payload", message={
"ip": ip_addr,
"ua": ua,
"ver": 3,
})
env.statsd.incr("activity_stream_csp_error")
return Response('', content_type='application/json; charset=utf-8', status=400)
env.log_dict(name="activity_stream", action=action, message=client_payload)
env.statsd.incr("activiyt_stream_csp")
return Response('', content_type='application/json; charset=utf-8', status=200)
def register_routes(app):
app.register_blueprint(links)

Просмотреть файл

@ -5,6 +5,7 @@ from tests.base import BaseTestCase
LINK_ACTIVITY_STREAM_V4 = "v4_links.activity_stream"
LINK_ACTIVITY_STREAM_V4_CSP = "v4_links.activity_stream_csp"
class TestActivityStreamPing(BaseTestCase):
@ -30,3 +31,28 @@ class TestActivityStreamPing(BaseTestCase):
data=json.dumps({"data": "test", "action": "activity_stream_session"}))
assert_equals(response.status_code, 200)
assert_equals(response.content_length, 0)
class TestActivityStreamCSP(BaseTestCase):
def test_missing_payload(self):
response = self.client.post(url_for(LINK_ACTIVITY_STREAM_V4_CSP),
content_type='application/json',
headers=[("User-Agent", "TestClient")])
assert_equals(response.status_code, 400)
assert_equals(response.content_length, 0)
def test_junk_payload(self):
response = self.client.post(url_for(LINK_ACTIVITY_STREAM_V4_CSP),
content_type='application/json',
headers=[("User-Agent", "TestClient")],
data='"hfdsfdsjkl"')
assert_equals(response.status_code, 400)
assert_equals(response.content_length, 0)
def test_payload_meta(self):
response = self.client.post(url_for(LINK_ACTIVITY_STREAM_V4_CSP),
content_type='application/json',
headers=[("User-Agent", "TestClient")],
data=json.dumps({"csp-report": {"document-uri": "http://example.com/page.html"}}))
assert_equals(response.status_code, 200)
assert_equals(response.content_length, 0)