Add filters to the scanners api endpoint (#14078)

This commit is contained in:
William Durand 2020-04-28 16:28:13 +02:00 коммит произвёл GitHub
Родитель 958bdfb748
Коммит e077b8bea7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 186 добавлений и 15 удалений

Просмотреть файл

@ -17,6 +17,8 @@ This endpoint returns a list of labelled scanner results.
.. http:get:: /api/v4/scanner/results/
:query string label: Filter by label.
:query string scanner: Filter by scanner name.
:>json int id: The scanner result ID.
:>json string scanner: The scanner name.
:>json string label: Either ``good`` or ``bad``.

Просмотреть файл

@ -1,7 +1,7 @@
from django.conf.urls import url
from .views import ScannerResultViewSet
from .views import ScannerResultView
urlpatterns = [
url(r'^results/$', ScannerResultViewSet.as_view(), name='scanner-results'),
url(r'^results/$', ScannerResultView.as_view(), name='scanner-results')
]

Просмотреть файл

@ -14,7 +14,7 @@ from olympia.scanners.serializers import ScannerResultSerializer
from django.test.utils import override_settings
class TestScannerResultViewSet(TestCase):
class TestScannerResultView(TestCase):
client_class = APITestClient
def setUp(self):
@ -25,6 +25,13 @@ class TestScannerResultViewSet(TestCase):
self.client.login_api(self.user)
self.url = reverse_ns('scanner-results', api_version='v5')
def assert_json_results(self, response, expected_results):
json = response.json()
assert 'results' in json
results = json['results']
assert len(results) == expected_results
return results
def test_endpoint_requires_authentication(self):
self.client.logout_api()
response = self.client.get(self.url)
@ -116,10 +123,7 @@ class TestScannerResultViewSet(TestCase):
response = self.client.get(self.url)
assert response.status_code == 200
json = response.json()
assert 'results' in json
results = json['results']
assert len(results) == 3
results = self.assert_json_results(response, expected_results=3)
# Force a `label` value so that the serialized (expected) data is
# accurate. This is needed because `label` is an annotated field
# created in the QuerySet.
@ -139,3 +143,127 @@ class TestScannerResultViewSet(TestCase):
# created in the QuerySet.
bad_result.label = 'bad'
assert results[2] == ScannerResultSerializer(instance=bad_result).data
def test_get_by_scanner(self):
self.create_switch('enable-scanner-results-api', active=True)
# result labelled as "bad" because its state is TRUE_POSITIVE
bad_version = version_factory(addon=addon_factory())
ScannerResult.objects.create(
scanner=YARA, version=bad_version, state=TRUE_POSITIVE
)
ScannerResult.objects.create(
scanner=CUSTOMS, version=bad_version, state=TRUE_POSITIVE
)
response = self.client.get(self.url)
self.assert_json_results(response, expected_results=2)
response = self.client.get('{}?scanner=yara'.format(self.url))
results = self.assert_json_results(response, expected_results=1)
assert results[0].get('scanner') == 'yara'
response = self.client.get('{}?scanner=customs'.format(self.url))
results = self.assert_json_results(response, expected_results=1)
assert results[0].get('scanner') == 'customs'
def test_get_by_scanner_with_empty_value(self):
self.create_switch('enable-scanner-results-api', active=True)
invalid_scanner = ""
response = self.client.get(
'{}?scanner={}'.format(self.url, invalid_scanner)
)
assert response.status_code == 400
def test_get_by_scanner_with_unknown_scanner(self):
self.create_switch('enable-scanner-results-api', active=True)
invalid_scanner = "yaraaaa"
response = self.client.get(
'{}?scanner={}'.format(self.url, invalid_scanner)
)
assert response.status_code == 400
def test_get_by_label(self):
self.create_switch('enable-scanner-results-api', active=True)
# result labelled as "bad" because its state is TRUE_POSITIVE
bad_version = version_factory(addon=addon_factory())
ScannerResult.objects.create(
scanner=YARA, version=bad_version, state=TRUE_POSITIVE
)
# result labelled as "good" because it has been approved
good_version = version_factory(addon=addon_factory())
ScannerResult.objects.create(scanner=WAT, version=good_version)
VersionLog.objects.create(
activity_log=ActivityLog.create(
action=amo.LOG.APPROVE_VERSION,
version=good_version,
user=self.user,
),
version=good_version,
)
response = self.client.get(self.url)
self.assert_json_results(response, expected_results=2)
response = self.client.get('{}?label=good'.format(self.url))
results = self.assert_json_results(response, expected_results=1)
assert results[0].get('label') == 'good'
response = self.client.get('{}?label=bad'.format(self.url))
results = self.assert_json_results(response, expected_results=1)
assert results[0].get('label') == 'bad'
def test_get_by_label_with_empty_value(self):
self.create_switch('enable-scanner-results-api', active=True)
invalid_label = ""
response = self.client.get(
'{}?label={}'.format(self.url, invalid_label)
)
assert response.status_code == 400
def test_get_by_label_with_unknown_label(self):
self.create_switch('enable-scanner-results-api', active=True)
invalid_label = "gooda"
response = self.client.get(
'{}?label={}'.format(self.url, invalid_label)
)
assert response.status_code == 400
def test_get_by_label_and_scanner(self):
self.create_switch('enable-scanner-results-api', active=True)
# result labelled as "bad" because its state is TRUE_POSITIVE
bad_version = version_factory(addon=addon_factory())
ScannerResult.objects.create(
scanner=YARA, version=bad_version, state=TRUE_POSITIVE
)
# result labelled as "good" because it has been approved
good_version = version_factory(addon=addon_factory())
ScannerResult.objects.create(scanner=WAT, version=good_version)
VersionLog.objects.create(
activity_log=ActivityLog.create(
action=amo.LOG.APPROVE_VERSION,
version=good_version,
user=self.user,
),
version=good_version,
)
response = self.client.get(self.url)
self.assert_json_results(response, expected_results=2)
response = self.client.get(
'{}'.format('{}?scanner=yara&label=good'.format(self.url))
)
self.assert_json_results(response, expected_results=0)
response = self.client.get(
'{}'.format('{}?scanner=yara&label=bad'.format(self.url))
)
self.assert_json_results(response, expected_results=1)
response = self.client.get(
'{}'.format('{}?scanner=wat&label=bad'.format(self.url))
)
self.assert_json_results(response, expected_results=0)
response = self.client.get(
'{}'.format('{}?scanner=wat&label=good'.format(self.url))
)
self.assert_json_results(response, expected_results=1)

Просмотреть файл

@ -4,17 +4,23 @@ from django.conf import settings
from django.db.models import CharField, Value
from django.db.transaction import non_atomic_requests
from django.http import Http404
from rest_framework.exceptions import ParseError
from rest_framework.generics import ListAPIView
from olympia import amo
from olympia.constants.scanners import TRUE_POSITIVE, LABEL_BAD, LABEL_GOOD
from olympia.constants.scanners import (
LABEL_BAD,
LABEL_GOOD,
SCANNERS,
TRUE_POSITIVE,
)
from olympia.api.permissions import GroupPermission
from .models import ScannerResult
from .serializers import ScannerResultSerializer
class ScannerResultViewSet(ListAPIView):
class ScannerResultView(ListAPIView):
permission_classes = [
GroupPermission(amo.permissions.ADMIN_SCANNERS_RESULTS_VIEW)
]
@ -22,9 +28,26 @@ class ScannerResultViewSet(ListAPIView):
serializer_class = ScannerResultSerializer
def get_queryset(self):
label = self.request.query_params.get('label', None)
scanner = next(
(
key
for key in SCANNERS
if SCANNERS.get(key) ==
self.request.query_params.get('scanner')
),
None,
)
bad_results = ScannerResult.objects.exclude(version=None)
good_results = ScannerResult.objects.exclude(version=None)
if scanner:
bad_results = bad_results.filter(scanner=scanner)
good_results = good_results.filter(scanner=scanner)
good_results = (
ScannerResult.objects.exclude(version=None)
.exclude(
good_results.exclude(
version__versionlog__activity_log__user_id=settings.TASK_USER_ID # noqa
)
.filter(
@ -37,21 +60,39 @@ class ScannerResultViewSet(ListAPIView):
.all()
)
bad_results = (
ScannerResult.objects.exclude(version=None)
.filter(state=TRUE_POSITIVE)
bad_results.filter(state=TRUE_POSITIVE)
.annotate(label=Value(LABEL_BAD, output_field=CharField()))
.all()
)
return good_results.union(bad_results).order_by('-created')
queryset = ScannerResult.objects.none()
if not label:
queryset = good_results.union(bad_results)
elif label == LABEL_GOOD:
queryset = good_results
elif label == LABEL_BAD:
queryset = bad_results
return queryset.order_by('-created')
def get(self, request, format=None):
if not waffle.switch_is_active('enable-scanner-results-api'):
raise Http404
label = self.request.query_params.get('label', None)
if label is not None and label not in [LABEL_BAD, LABEL_GOOD]:
raise ParseError("invalid value for label")
scanner = self.request.query_params.get('scanner', None)
if scanner is not None and scanner not in list(SCANNERS.values()):
raise ParseError("invalid value for scanner")
return super().get(request, format)
@classmethod
def as_view(cls, **initkwargs):
"""The API is read-only so we can turn off atomic requests."""
return non_atomic_requests(
super(ScannerResultViewSet, cls).as_view(**initkwargs)
super(ScannerResultView, cls).as_view(**initkwargs)
)