Remove M2M between ScannerQueryResult and ScannerQueryRule, use FK instead (#19800)
* Remove M2M between ScannerQueryResult and ScannerQueryRule, use FK instead For queries, there should be one rule per result only. This allows deletion of query rules to properly delete query results, and in turn avoids the issue where orphaned results would stay around and mess up filtering when only one rule was remaining. Because it's in the way, also move things specific to ScannerResult and ScannerRule that ScannerQueryResult/ScannerQueryRule are not using outside of the common base classes.
This commit is contained in:
Родитель
836e268423
Коммит
d398a4f721
|
@ -2,7 +2,6 @@ from django.conf import settings
|
|||
from django.contrib import admin, messages
|
||||
from django.contrib.admin import SimpleListFilter
|
||||
from django.contrib.admin.views.main import ChangeList, ERROR_FLAG, PAGE_VAR
|
||||
from django.core.exceptions import FieldDoesNotExist
|
||||
from django.db.models import Count, Prefetch
|
||||
from django.http import Http404
|
||||
from django.http.request import QueryDict
|
||||
|
@ -273,35 +272,8 @@ class ScannerResultChangeList(ChangeList):
|
|||
class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
||||
actions = None
|
||||
view_on_site = False
|
||||
|
||||
list_display = (
|
||||
'id',
|
||||
'formatted_addon',
|
||||
'guid',
|
||||
'authors',
|
||||
'scanner',
|
||||
'formatted_score',
|
||||
'formatted_matched_rules',
|
||||
'formatted_created',
|
||||
'result_actions',
|
||||
)
|
||||
list_select_related = ('version',)
|
||||
raw_id_fields = ('version', 'upload')
|
||||
|
||||
fields = (
|
||||
'id',
|
||||
'upload',
|
||||
'formatted_addon',
|
||||
'authors',
|
||||
'guid',
|
||||
'scanner',
|
||||
'formatted_score',
|
||||
'created',
|
||||
'state',
|
||||
'formatted_matched_rules_with_files',
|
||||
'result_actions',
|
||||
'formatted_results',
|
||||
)
|
||||
raw_id_fields = ('version',)
|
||||
|
||||
ordering = ('-pk',)
|
||||
|
||||
|
@ -326,7 +298,6 @@ class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
|||
),
|
||||
'version__file',
|
||||
'version__addon__authors',
|
||||
'matched_rules',
|
||||
)
|
||||
|
||||
def get_unfiltered_changelist_params(self):
|
||||
|
@ -370,10 +341,6 @@ class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
|||
to_exclude = []
|
||||
if not self.has_actions_permission(request):
|
||||
to_exclude = ['result_actions']
|
||||
try:
|
||||
self.model._meta.get_field('upload')
|
||||
except FieldDoesNotExist:
|
||||
to_exclude.append('upload')
|
||||
fields = list(filter(lambda x: x not in to_exclude, fields))
|
||||
return fields
|
||||
|
||||
|
@ -469,8 +436,12 @@ class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
|||
formatted_results.short_description = 'Results'
|
||||
|
||||
def formatted_matched_rules(self, obj):
|
||||
rule_model = self.model.matched_rules.rel.model
|
||||
info = rule_model._meta.app_label, rule_model._meta.model_name
|
||||
info = obj.rule_model._meta.app_label, obj.rule_model._meta.model_name
|
||||
rules = (
|
||||
[obj.matched_rule]
|
||||
if hasattr(obj, 'matched_rule')
|
||||
else obj.matched_rules.all()
|
||||
)
|
||||
|
||||
return format_html(
|
||||
', '.join(
|
||||
|
@ -480,7 +451,7 @@ class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
|||
rule.name,
|
||||
rule.get_scanner_display(),
|
||||
)
|
||||
for rule in obj.matched_rules.all()
|
||||
for rule in rules
|
||||
]
|
||||
)
|
||||
)
|
||||
|
@ -491,8 +462,13 @@ class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
|||
self, obj, template_name='formatted_matched_rules_with_files'
|
||||
):
|
||||
files_by_matched_rules = obj.get_files_by_matched_rules()
|
||||
rule_model = self.model.matched_rules.rel.model
|
||||
info = rule_model._meta.app_label, rule_model._meta.model_name
|
||||
info = obj.rule_model._meta.app_label, obj.rule_model._meta.model_name
|
||||
rules = (
|
||||
[obj.matched_rule]
|
||||
if hasattr(obj, 'matched_rule')
|
||||
else obj.matched_rules.all()
|
||||
)
|
||||
|
||||
return render_to_string(
|
||||
f'admin/scanners/scannerresult/{template_name}.html',
|
||||
{
|
||||
|
@ -505,7 +481,7 @@ class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
|||
'name': rule.name,
|
||||
'files': files_by_matched_rules[rule.name],
|
||||
}
|
||||
for rule in obj.matched_rules.all()
|
||||
for rule in rules
|
||||
],
|
||||
'addon_id': obj.version.addon.pk if obj.version else None,
|
||||
'version_id': obj.version.pk if obj.version else None,
|
||||
|
@ -514,6 +490,123 @@ class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
|||
|
||||
formatted_matched_rules_with_files.short_description = 'Matched rules'
|
||||
|
||||
|
||||
class AbstractScannerRuleAdminMixin(admin.ModelAdmin):
|
||||
view_on_site = False
|
||||
|
||||
list_display = ('name', 'scanner', 'action', 'is_active')
|
||||
list_filter = ('scanner', 'action', 'is_active')
|
||||
fields = (
|
||||
'scanner',
|
||||
'name',
|
||||
'action',
|
||||
'created',
|
||||
'modified',
|
||||
'matched_results_link',
|
||||
'is_active',
|
||||
'definition',
|
||||
)
|
||||
readonly_fields = ('created', 'modified', 'matched_results_link')
|
||||
|
||||
def formfield_for_choice_field(self, db_field, request, **kwargs):
|
||||
if db_field.name == 'scanner':
|
||||
kwargs['choices'] = (('', '---------'),)
|
||||
for key, value in db_field.get_choices():
|
||||
if key in [CUSTOMS, YARA]:
|
||||
kwargs['choices'] += ((key, value),)
|
||||
return super().formfield_for_choice_field(db_field, request, **kwargs)
|
||||
|
||||
class Media:
|
||||
css = {'all': ('css/admin/scannerrule.css',)}
|
||||
|
||||
def get_fields(self, request, obj=None):
|
||||
fields = super().get_fields(request, obj)
|
||||
if not self.has_change_permission(request, obj):
|
||||
# Remove the 'definition' field...
|
||||
fields = list(filter(lambda x: x != 'definition', fields))
|
||||
# ...and add its readonly (and pretty!) alter-ego.
|
||||
fields.append('formatted_definition')
|
||||
return fields
|
||||
|
||||
def matched_results_link(self, obj):
|
||||
if not obj.pk or not obj.scanner:
|
||||
return '-'
|
||||
counts = obj.results.aggregate(
|
||||
addons=Count('version__addon', distinct=True), total=Count('id')
|
||||
)
|
||||
ResultModel = obj.results.model
|
||||
url = reverse(
|
||||
'admin:{}_{}_changelist'.format(
|
||||
ResultModel._meta.app_label, ResultModel._meta.model_name
|
||||
)
|
||||
)
|
||||
# The parameter name is called matched_rule or matched_rules depending
|
||||
# on the model, because one of them is a many to many and the other a
|
||||
# simple FK.
|
||||
param_name = 'matched_rule%s__id__exact' % (
|
||||
's' if obj._meta.get_field('results').many_to_many else ''
|
||||
)
|
||||
params = {
|
||||
param_name: str(obj.pk),
|
||||
}
|
||||
result_admin = admin.site._registry[ResultModel]
|
||||
params.update(result_admin.get_unfiltered_changelist_params())
|
||||
return format_html(
|
||||
'<a href="{}?{}">{} ({} add-ons)</a>',
|
||||
url,
|
||||
urlencode(params),
|
||||
counts['total'],
|
||||
counts['addons'],
|
||||
)
|
||||
|
||||
matched_results_link.short_description = 'Matched Results'
|
||||
|
||||
def formatted_definition(self, obj):
|
||||
return format_html('<pre>{}</pre>', obj.definition)
|
||||
|
||||
formatted_definition.short_description = 'Definition'
|
||||
|
||||
|
||||
@admin.register(ScannerResult)
|
||||
class ScannerResultAdmin(AbstractScannerResultAdminMixin, admin.ModelAdmin):
|
||||
fields = (
|
||||
'id',
|
||||
'upload',
|
||||
'formatted_addon',
|
||||
'authors',
|
||||
'guid',
|
||||
'scanner',
|
||||
'formatted_score',
|
||||
'created',
|
||||
'state',
|
||||
'formatted_matched_rules_with_files',
|
||||
'result_actions',
|
||||
'formatted_results',
|
||||
)
|
||||
list_display = (
|
||||
'id',
|
||||
'formatted_addon',
|
||||
'guid',
|
||||
'authors',
|
||||
'scanner',
|
||||
'formatted_score',
|
||||
'formatted_matched_rules',
|
||||
'formatted_created',
|
||||
'result_actions',
|
||||
)
|
||||
list_filter = (
|
||||
'scanner',
|
||||
MatchesFilter,
|
||||
StateFilter,
|
||||
('matched_rules', ScannerRuleListFilter),
|
||||
WithVersionFilter,
|
||||
ExcludeMatchedRulesFilter,
|
||||
)
|
||||
raw_id_fields = AbstractScannerResultAdminMixin.raw_id_fields + ('upload',)
|
||||
|
||||
def get_queryset(self, request):
|
||||
return super().get_queryset(request).prefetch_related('matched_rules')
|
||||
|
||||
def formatted_score(self, obj):
|
||||
if obj.scanner not in [CUSTOMS, MAD]:
|
||||
return '-'
|
||||
|
@ -673,90 +766,18 @@ class AbstractScannerResultAdminMixin(admin.ModelAdmin):
|
|||
result_actions.allow_tags = True
|
||||
|
||||
|
||||
class AbstractScannerRuleAdminMixin(admin.ModelAdmin):
|
||||
view_on_site = False
|
||||
|
||||
list_display = ('name', 'scanner', 'action', 'is_active')
|
||||
list_filter = ('scanner', 'action', 'is_active')
|
||||
fields = (
|
||||
'scanner',
|
||||
'name',
|
||||
'action',
|
||||
'created',
|
||||
'modified',
|
||||
'matched_results_link',
|
||||
'is_active',
|
||||
'definition',
|
||||
)
|
||||
readonly_fields = ('created', 'modified', 'matched_results_link')
|
||||
|
||||
def formfield_for_choice_field(self, db_field, request, **kwargs):
|
||||
if db_field.name == 'scanner':
|
||||
kwargs['choices'] = (('', '---------'),)
|
||||
for key, value in db_field.get_choices():
|
||||
if key in [CUSTOMS, YARA]:
|
||||
kwargs['choices'] += ((key, value),)
|
||||
return super().formfield_for_choice_field(db_field, request, **kwargs)
|
||||
|
||||
class Media:
|
||||
css = {'all': ('css/admin/scannerrule.css',)}
|
||||
|
||||
def get_fields(self, request, obj=None):
|
||||
fields = super().get_fields(request, obj)
|
||||
if not self.has_change_permission(request, obj):
|
||||
# Remove the 'definition' field...
|
||||
fields = list(filter(lambda x: x != 'definition', fields))
|
||||
# ...and add its readonly (and pretty!) alter-ego.
|
||||
fields.append('formatted_definition')
|
||||
return fields
|
||||
|
||||
def matched_results_link(self, obj):
|
||||
if not obj.pk or not obj.scanner:
|
||||
return '-'
|
||||
counts = obj.results.aggregate(
|
||||
addons=Count('version__addon', distinct=True), total=Count('id')
|
||||
)
|
||||
ResultModel = obj.results.model
|
||||
url = reverse(
|
||||
'admin:{}_{}_changelist'.format(
|
||||
ResultModel._meta.app_label, ResultModel._meta.model_name
|
||||
)
|
||||
)
|
||||
params = {
|
||||
'matched_rules__id__exact': str(obj.pk),
|
||||
}
|
||||
result_admin = admin.site._registry[ResultModel]
|
||||
params.update(result_admin.get_unfiltered_changelist_params())
|
||||
return format_html(
|
||||
'<a href="{}?{}">{} ({} add-ons)</a>',
|
||||
url,
|
||||
urlencode(params),
|
||||
counts['total'],
|
||||
counts['addons'],
|
||||
)
|
||||
|
||||
matched_results_link.short_description = 'Matched Results'
|
||||
|
||||
def formatted_definition(self, obj):
|
||||
return format_html('<pre>{}</pre>', obj.definition)
|
||||
|
||||
formatted_definition.short_description = 'Definition'
|
||||
|
||||
|
||||
@admin.register(ScannerResult)
|
||||
class ScannerResultAdmin(AbstractScannerResultAdminMixin, admin.ModelAdmin):
|
||||
list_filter = (
|
||||
'scanner',
|
||||
MatchesFilter,
|
||||
StateFilter,
|
||||
('matched_rules', ScannerRuleListFilter),
|
||||
WithVersionFilter,
|
||||
ExcludeMatchedRulesFilter,
|
||||
)
|
||||
|
||||
|
||||
@admin.register(ScannerQueryResult)
|
||||
class ScannerQueryResultAdmin(AbstractScannerResultAdminMixin, admin.ModelAdmin):
|
||||
fields = (
|
||||
'id',
|
||||
'formatted_addon',
|
||||
'authors',
|
||||
'guid',
|
||||
'scanner',
|
||||
'created',
|
||||
'formatted_matched_rules_with_files',
|
||||
'formatted_results',
|
||||
)
|
||||
raw_id_fields = ('version',)
|
||||
list_display_links = None
|
||||
list_display = (
|
||||
|
@ -774,7 +795,7 @@ class ScannerQueryResultAdmin(AbstractScannerResultAdminMixin, admin.ModelAdmin)
|
|||
'download',
|
||||
)
|
||||
list_filter = (
|
||||
('matched_rules', ScannerRuleListFilter),
|
||||
('matched_rule', ScannerRuleListFilter),
|
||||
('version__channel', VersionChannelFilter),
|
||||
('version__addon__status', AddonStatusFilter),
|
||||
('version__addon__disabled_by_user', AddonVisibilityFilter),
|
||||
|
@ -782,6 +803,9 @@ class ScannerQueryResultAdmin(AbstractScannerResultAdminMixin, admin.ModelAdmin)
|
|||
('version__file__is_signed', FileIsSigned),
|
||||
('was_blocked', admin.BooleanFieldListFilter),
|
||||
)
|
||||
list_select_related = AbstractScannerResultAdminMixin.list_select_related + (
|
||||
'matched_rule',
|
||||
)
|
||||
|
||||
ordering = ('version__addon_id', 'version__channel', 'version__created')
|
||||
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
# Generated by Django 3.2.16 on 2022-10-17 19:00
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
|
||||
|
||||
def delete_existing_query_rules_and_results(apps, scheme_editor):
|
||||
# This migration deletes all ScannerQueryRule and ScannerQueryResults. We
|
||||
# shouldn't have many laying around in production and deleting them instead
|
||||
# of updating them to the new schema is a lot simpler: we can directly add
|
||||
# the matched_rule FK without allowing `None`.
|
||||
ScannerQueryRule = apps.get_model('scanners', 'ScannerQueryRule')
|
||||
ScannerQueryResult = apps.get_model('scanners', 'ScannerQueryResult')
|
||||
ScannerQueryRule.objects.all().delete()
|
||||
ScannerQueryResult.objects.all().delete()
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('scanners', '0047_delete_wat_scanner_results'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(delete_existing_query_rules_and_results),
|
||||
migrations.RemoveField(
|
||||
model_name='scannerquerymatch',
|
||||
name='result',
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='scannerquerymatch',
|
||||
name='rule',
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='scannerqueryresult',
|
||||
name='matched_rules',
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='scannerqueryresult',
|
||||
name='matched_rule',
|
||||
field=models.ForeignKey(
|
||||
# Django requires a default in the migration, but it shouldn't
|
||||
# be used as we're deleting all data beforehand.
|
||||
default=None,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name='results',
|
||||
to='scanners.scannerqueryrule',
|
||||
),
|
||||
preserve_default=False,
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='scannerqueryresult',
|
||||
index=models.Index(
|
||||
fields=['was_blocked'], name='scanners_qu_was_blo_7eec1c_idx'
|
||||
),
|
||||
),
|
||||
migrations.RemoveIndex(
|
||||
model_name='scannerqueryresult',
|
||||
name='scanners_qu_has_mat_a6766b_idx',
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='scannerqueryresult',
|
||||
name='has_matches',
|
||||
),
|
||||
migrations.DeleteModel(
|
||||
name='ScannerQueryMatch',
|
||||
),
|
||||
]
|
|
@ -0,0 +1,29 @@
|
|||
# Generated by Django 3.2.16 on 2022-10-18 16:35
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('scanners', '0048_auto_20221017_1900'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveIndex(
|
||||
model_name='scannerqueryresult',
|
||||
name='scanners_qu_state_32d49e_idx',
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='scannerqueryresult',
|
||||
name='state',
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='scannerqueryrule',
|
||||
name='action',
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='scannerqueryrule',
|
||||
name='is_active',
|
||||
),
|
||||
]
|
|
@ -9,6 +9,7 @@ import yara
|
|||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import models
|
||||
from django.utils.functional import classproperty
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
import olympia.core.logger
|
||||
|
@ -56,10 +57,6 @@ class AbstractScannerResult(ModelBase):
|
|||
# Store the "raw" results of a scanner.
|
||||
results = models.JSONField(default=list)
|
||||
scanner = models.PositiveSmallIntegerField(choices=SCANNERS.items())
|
||||
has_matches = models.BooleanField(null=True)
|
||||
state = models.PositiveSmallIntegerField(
|
||||
choices=RESULT_STATES.items(), null=True, blank=True, default=UNKNOWN
|
||||
)
|
||||
version = models.ForeignKey(
|
||||
'versions.Version',
|
||||
related_name='%(class)ss',
|
||||
|
@ -69,10 +66,6 @@ class AbstractScannerResult(ModelBase):
|
|||
|
||||
class Meta(ModelBase.Meta):
|
||||
abstract = True
|
||||
indexes = [
|
||||
models.Index(fields=('has_matches',)),
|
||||
models.Index(fields=('state',)),
|
||||
]
|
||||
|
||||
def add_yara_result(self, rule, tags=None, meta=None):
|
||||
"""This method is used to store a Yara result."""
|
||||
|
@ -88,20 +81,18 @@ class AbstractScannerResult(ModelBase):
|
|||
# We do not have support for the remaining scanners (yet).
|
||||
return []
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
rule_model = self._meta.get_field('matched_rules').related_model
|
||||
matched_rules = rule_model.objects.filter(
|
||||
def get_rules_queryset(self):
|
||||
"""Helper to return the rule(s) queryset matching the rule name and
|
||||
scanner for this result."""
|
||||
return self.rule_model.objects.filter(
|
||||
scanner=self.scanner,
|
||||
name__in=self.extract_rule_names(),
|
||||
# See: https://github.com/mozilla/addons-server/issues/13143
|
||||
is_active=True,
|
||||
)
|
||||
self.has_matches = bool(matched_rules)
|
||||
# Save the instance first...
|
||||
super().save(*args, **kwargs)
|
||||
# ...then add the associated rules.
|
||||
for scanner_rule in matched_rules:
|
||||
self.matched_rules.add(scanner_rule)
|
||||
|
||||
@classproperty
|
||||
def rule_model(self):
|
||||
# Implement in concrete models.
|
||||
raise NotImplementedError
|
||||
|
||||
def get_scanner_name(self):
|
||||
return SCANNERS.get(self.scanner)
|
||||
|
@ -122,100 +113,11 @@ class AbstractScannerResult(ModelBase):
|
|||
res[ruleId].append(filename)
|
||||
return res
|
||||
|
||||
def can_report_feedback(self):
|
||||
return self.state == UNKNOWN and self.scanner not in [MAD]
|
||||
|
||||
def can_revert_feedback(self):
|
||||
return self.state != UNKNOWN and self.scanner not in [MAD]
|
||||
|
||||
def get_git_repository(self):
|
||||
return {
|
||||
CUSTOMS: settings.CUSTOMS_GIT_REPOSITORY,
|
||||
}.get(self.scanner)
|
||||
|
||||
@classmethod
|
||||
def run_action(cls, version):
|
||||
"""Try to find and execute an action for a given version, based on the
|
||||
scanner results and associated rules.
|
||||
|
||||
If an action is found, it is run synchronously from this method, not in
|
||||
a task.
|
||||
"""
|
||||
log.info('Checking rules and actions for version %s.', version.pk)
|
||||
|
||||
if version.addon.type != ADDON_EXTENSION:
|
||||
log.info(
|
||||
'Not running action(s) on version %s which belongs to a non-extension.',
|
||||
version.pk,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
mad_result = cls.objects.filter(version=version, scanner=MAD).get()
|
||||
customs = mad_result.results.get('scanners', {}).get('customs', {})
|
||||
customs_score = customs.get('score', 0.5)
|
||||
customs_models_agree = customs.get('result_details', {}).get(
|
||||
'models_agree', True
|
||||
)
|
||||
|
||||
if (
|
||||
customs_score <= 0.01
|
||||
or customs_score >= 0.99
|
||||
or not customs_models_agree
|
||||
):
|
||||
log.info('Flagging version %s for human review by MAD.', version.pk)
|
||||
_flag_for_human_review_by_scanner(version, MAD)
|
||||
except cls.DoesNotExist:
|
||||
log.info('No MAD scanner result for version %s.', version.pk)
|
||||
pass
|
||||
|
||||
rule_model = cls.matched_rules.rel.model
|
||||
result_query_name = cls._meta.get_field('matched_rules').related_query_name()
|
||||
|
||||
rule = (
|
||||
rule_model.objects.filter(
|
||||
**{f'{result_query_name}__version': version, 'is_active': True}
|
||||
)
|
||||
.order_by(
|
||||
# The `-` sign means descending order.
|
||||
'-action'
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not rule:
|
||||
log.info('No action to execute for version %s.', version.pk)
|
||||
return
|
||||
|
||||
action_id = rule.action
|
||||
action_name = ACTIONS.get(action_id, None)
|
||||
|
||||
if not action_name:
|
||||
raise Exception('invalid action %s' % action_id)
|
||||
|
||||
ACTION_FUNCTIONS = {
|
||||
NO_ACTION: _no_action,
|
||||
FLAG_FOR_HUMAN_REVIEW: _flag_for_human_review,
|
||||
DELAY_AUTO_APPROVAL: _delay_auto_approval,
|
||||
DELAY_AUTO_APPROVAL_INDEFINITELY: _delay_auto_approval_indefinitely,
|
||||
DELAY_AUTO_APPROVAL_INDEFINITELY_AND_RESTRICT: (
|
||||
_delay_auto_approval_indefinitely_and_restrict
|
||||
),
|
||||
DELAY_AUTO_APPROVAL_INDEFINITELY_AND_RESTRICT_FUTURE_APPROVALS: (
|
||||
_delay_auto_approval_indefinitely_and_restrict_future_approvals
|
||||
),
|
||||
}
|
||||
|
||||
action_function = ACTION_FUNCTIONS.get(action_id, None)
|
||||
|
||||
if not action_function:
|
||||
raise Exception('no implementation for action %s' % action_id)
|
||||
|
||||
# We have a valid action to execute, so let's do it!
|
||||
log.info('Starting action "%s" for version %s.', action_name, version.pk)
|
||||
action_function(version)
|
||||
log.info('Ending action "%s" for version %s.', action_name, version.pk)
|
||||
|
||||
|
||||
class AbstractScannerRule(ModelBase):
|
||||
name = models.CharField(
|
||||
|
@ -223,16 +125,6 @@ class AbstractScannerRule(ModelBase):
|
|||
help_text=_('This is the exact name of the rule used by a scanner.'),
|
||||
)
|
||||
scanner = models.PositiveSmallIntegerField(choices=SCANNERS.items())
|
||||
action = models.PositiveSmallIntegerField(
|
||||
choices=ACTIONS.items(), default=NO_ACTION
|
||||
)
|
||||
is_active = models.BooleanField(
|
||||
default=True,
|
||||
help_text=_(
|
||||
'When unchecked, the scanner results will not be bound to this '
|
||||
'rule and the action will not be executed.'
|
||||
),
|
||||
)
|
||||
definition = models.TextField(null=True, blank=True)
|
||||
|
||||
class Meta(ModelBase.Meta):
|
||||
|
@ -295,6 +187,17 @@ class AbstractScannerRule(ModelBase):
|
|||
|
||||
|
||||
class ScannerRule(AbstractScannerRule):
|
||||
action = models.PositiveSmallIntegerField(
|
||||
choices=ACTIONS.items(), default=NO_ACTION
|
||||
)
|
||||
is_active = models.BooleanField(
|
||||
default=True,
|
||||
help_text=_(
|
||||
'When unchecked, the scanner results will not be bound to this '
|
||||
'rule and the action will not be executed.'
|
||||
),
|
||||
)
|
||||
|
||||
class Meta(AbstractScannerRule.Meta):
|
||||
db_table = 'scanners_rules'
|
||||
|
||||
|
@ -315,6 +218,10 @@ class ScannerResult(AbstractScannerResult):
|
|||
null=True, blank=True, max_digits=6, decimal_places=5, default=-1
|
||||
)
|
||||
model_version = models.CharField(max_length=30, null=True)
|
||||
has_matches = models.BooleanField(null=True)
|
||||
state = models.PositiveSmallIntegerField(
|
||||
choices=RESULT_STATES.items(), null=True, blank=True, default=UNKNOWN
|
||||
)
|
||||
|
||||
class Meta(AbstractScannerResult.Meta):
|
||||
db_table = 'scanners_results'
|
||||
|
@ -324,6 +231,115 @@ class ScannerResult(AbstractScannerResult):
|
|||
name='scanners_results_upload_id_scanner_version_id_ad9eb8a6_uniq',
|
||||
)
|
||||
]
|
||||
indexes = [
|
||||
models.Index(fields=('state',)),
|
||||
models.Index(fields=('has_matches',)),
|
||||
]
|
||||
|
||||
@classproperty
|
||||
def rule_model(self):
|
||||
return self.matched_rules.rel.model
|
||||
|
||||
def get_rules_queryset(self):
|
||||
# See: https://github.com/mozilla/addons-server/issues/13143
|
||||
return super().get_rules_queryset().filter(is_active=True)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
matched_rules = self.get_rules_queryset()
|
||||
self.has_matches = bool(matched_rules)
|
||||
# Save the instance first...
|
||||
super().save(*args, **kwargs)
|
||||
# ...then add the associated rules.
|
||||
for scanner_rule in matched_rules:
|
||||
self.matched_rules.add(scanner_rule)
|
||||
|
||||
def can_report_feedback(self):
|
||||
return self.state == UNKNOWN and self.scanner not in [MAD]
|
||||
|
||||
def can_revert_feedback(self):
|
||||
return self.state != UNKNOWN and self.scanner not in [MAD]
|
||||
|
||||
@classmethod
|
||||
def run_action(cls, version):
|
||||
"""Try to find and execute an action for a given version, based on the
|
||||
scanner results and associated rules.
|
||||
|
||||
If an action is found, it is run synchronously from this method, not in
|
||||
a task.
|
||||
"""
|
||||
log.info('Checking rules and actions for version %s.', version.pk)
|
||||
|
||||
if version.addon.type != ADDON_EXTENSION:
|
||||
log.info(
|
||||
'Not running action(s) on version %s which belongs to a non-extension.',
|
||||
version.pk,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
mad_result = cls.objects.filter(version=version, scanner=MAD).get()
|
||||
customs = mad_result.results.get('scanners', {}).get('customs', {})
|
||||
customs_score = customs.get('score', 0.5)
|
||||
customs_models_agree = customs.get('result_details', {}).get(
|
||||
'models_agree', True
|
||||
)
|
||||
|
||||
if (
|
||||
customs_score <= 0.01
|
||||
or customs_score >= 0.99
|
||||
or not customs_models_agree
|
||||
):
|
||||
log.info('Flagging version %s for human review by MAD.', version.pk)
|
||||
_flag_for_human_review_by_scanner(version, MAD)
|
||||
except cls.DoesNotExist:
|
||||
log.info('No MAD scanner result for version %s.', version.pk)
|
||||
pass
|
||||
|
||||
result_query_name = cls._meta.get_field('matched_rules').related_query_name()
|
||||
|
||||
rule = (
|
||||
cls.rule_model.objects.filter(
|
||||
**{f'{result_query_name}__version': version, 'is_active': True}
|
||||
)
|
||||
.order_by(
|
||||
# The `-` sign means descending order.
|
||||
'-action'
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not rule:
|
||||
log.info('No action to execute for version %s.', version.pk)
|
||||
return
|
||||
|
||||
action_id = rule.action
|
||||
action_name = ACTIONS.get(action_id, None)
|
||||
|
||||
if not action_name:
|
||||
raise Exception('invalid action %s' % action_id)
|
||||
|
||||
ACTION_FUNCTIONS = {
|
||||
NO_ACTION: _no_action,
|
||||
FLAG_FOR_HUMAN_REVIEW: _flag_for_human_review,
|
||||
DELAY_AUTO_APPROVAL: _delay_auto_approval,
|
||||
DELAY_AUTO_APPROVAL_INDEFINITELY: _delay_auto_approval_indefinitely,
|
||||
DELAY_AUTO_APPROVAL_INDEFINITELY_AND_RESTRICT: (
|
||||
_delay_auto_approval_indefinitely_and_restrict
|
||||
),
|
||||
DELAY_AUTO_APPROVAL_INDEFINITELY_AND_RESTRICT_FUTURE_APPROVALS: (
|
||||
_delay_auto_approval_indefinitely_and_restrict_future_approvals
|
||||
),
|
||||
}
|
||||
|
||||
action_function = ACTION_FUNCTIONS.get(action_id, None)
|
||||
|
||||
if not action_function:
|
||||
raise Exception('no implementation for action %s' % action_id)
|
||||
|
||||
# We have a valid action to execute, so let's do it!
|
||||
log.info('Starting action "%s" for version %s.', action_name, version.pk)
|
||||
action_function(version)
|
||||
log.info('Ending action "%s" for version %s.', action_name, version.pk)
|
||||
|
||||
|
||||
class ScannerMatch(ModelBase):
|
||||
|
@ -402,17 +418,25 @@ class ScannerQueryRule(AbstractScannerRule):
|
|||
|
||||
|
||||
class ScannerQueryResult(AbstractScannerResult):
|
||||
# Has to be overridden, because the parent refers to ScannerMatch.
|
||||
matched_rules = models.ManyToManyField(
|
||||
'ScannerQueryRule', through='ScannerQueryMatch', related_name='results'
|
||||
# Note: ScannerResult uses a M2M called 'matched_rules', but here we don't
|
||||
# need a M2M because there will always be a single rule for each result, so
|
||||
# we have a single FK called 'matched_rule' (singular).
|
||||
matched_rule = models.ForeignKey(
|
||||
ScannerQueryRule, on_delete=models.CASCADE, related_name='results'
|
||||
)
|
||||
was_blocked = models.BooleanField(null=True, default=None)
|
||||
|
||||
class Meta(AbstractScannerResult.Meta):
|
||||
db_table = 'scanners_query_results'
|
||||
# FIXME indexes, unique constraints ?
|
||||
indexes = [
|
||||
models.Index(fields=('was_blocked',)),
|
||||
]
|
||||
|
||||
@classproperty
|
||||
def rule_model(cls):
|
||||
return cls.matched_rule.field.related_model
|
||||
|
||||
class ScannerQueryMatch(ModelBase):
|
||||
result = models.ForeignKey(ScannerQueryResult, on_delete=models.CASCADE)
|
||||
rule = models.ForeignKey(ScannerQueryRule, on_delete=models.CASCADE)
|
||||
def save(self, *args, **kwargs):
|
||||
if self.results:
|
||||
self.matched_rule = self.get_rules_queryset().get()
|
||||
super().save(*args, **kwargs)
|
||||
|
|
|
@ -1161,7 +1161,7 @@ class TestScannerQueryRuleAdmin(TestCase):
|
|||
link = doc('.field-matched_results_link a')
|
||||
assert link
|
||||
results_list_url = reverse('admin:scanners_scannerqueryresult_changelist')
|
||||
expected_href = f'{results_list_url}?matched_rules__id__exact={rule.pk}'
|
||||
expected_href = f'{results_list_url}?matched_rule__id__exact={rule.pk}'
|
||||
assert link.attr('href') == expected_href
|
||||
assert link.text() == '3 (2 add-ons)'
|
||||
|
||||
|
@ -1432,17 +1432,22 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
model=ScannerQueryResult, admin_site=AdminSite()
|
||||
)
|
||||
|
||||
self.rule = ScannerQueryRule.objects.create(name='myrule', scanner=YARA)
|
||||
|
||||
def scanner_query_result_factory(self, *args, **kwargs):
|
||||
kwargs.setdefault('scanner', YARA)
|
||||
result = ScannerQueryResult(*args, **kwargs)
|
||||
if 'rule' not in kwargs and 'results' not in kwargs:
|
||||
result.add_yara_result(rule=self.rule.name)
|
||||
result.save()
|
||||
return result
|
||||
|
||||
def test_list_view(self):
|
||||
addon = addon_factory()
|
||||
addon.update(average_daily_users=999)
|
||||
addon.authors.add(user_factory(email='foo@bar.com'))
|
||||
addon.authors.add(user_factory(email='bar@foo.com'))
|
||||
rule = ScannerQueryRule.objects.create(name='rule', scanner=YARA)
|
||||
result = ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=addon.current_version
|
||||
)
|
||||
result.add_yara_result(rule=rule.name)
|
||||
result.save()
|
||||
result = self.scanner_query_result_factory(version=addon.current_version)
|
||||
response = self.client.get(self.list_url)
|
||||
assert response.status_code == 200
|
||||
html = pq(response.content)
|
||||
|
@ -1483,12 +1488,7 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
assert '/icon-yes.svg' in html('.field-is_file_signed img')[0].attrib['src']
|
||||
|
||||
def test_list_view_no_query_permissions(self):
|
||||
rule = ScannerQueryRule.objects.create(name='rule', scanner=YARA)
|
||||
result = ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=addon_factory().current_version
|
||||
)
|
||||
result.add_yara_result(rule=rule.name)
|
||||
result.save()
|
||||
self.scanner_query_result_factory(version=addon_factory().current_version)
|
||||
|
||||
self.user = user_factory(email='somebodyelse@mozilla.com')
|
||||
# Give the user permission to edit ScannersResults, but not
|
||||
|
@ -1513,8 +1513,9 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
doc = pq(response.content)
|
||||
expected = [
|
||||
('All', '?'),
|
||||
('bar (yara)', f'?matched_rules__id__exact={rule_bar.pk}'),
|
||||
('foo (yara)', f'?matched_rules__id__exact={rule_foo.pk}'),
|
||||
('bar (yara)', f'?matched_rule__id__exact={rule_bar.pk}'),
|
||||
('foo (yara)', f'?matched_rule__id__exact={rule_foo.pk}'),
|
||||
('myrule (yara)', f'?matched_rule__id__exact={self.rule.pk}'),
|
||||
('All', '?'),
|
||||
('Unlisted', '?version__channel__exact=1'),
|
||||
('Listed', '?version__channel__exact=2'),
|
||||
|
@ -1555,7 +1556,7 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
response = self.client.get(
|
||||
self.list_url,
|
||||
{
|
||||
'matched_rules__id__exact': rule_bar.pk,
|
||||
'matched_rule__id__exact': rule_bar.pk,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
@ -1565,13 +1566,11 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
|
||||
def test_list_filter_channel(self):
|
||||
addon = addon_factory()
|
||||
ScannerQueryResult.objects.create(scanner=YARA, version=addon.versions.all()[0])
|
||||
self.scanner_query_result_factory(version=addon.versions.get())
|
||||
unlisted_addon = addon_factory(
|
||||
version_kw={'channel': amo.CHANNEL_UNLISTED}, status=amo.STATUS_NULL
|
||||
)
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=unlisted_addon.versions.all()[0]
|
||||
)
|
||||
self.scanner_query_result_factory(version=unlisted_addon.versions.get())
|
||||
|
||||
response = self.client.get(
|
||||
self.list_url,
|
||||
|
@ -1597,13 +1596,9 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
|
||||
def test_list_filter_addon_status(self):
|
||||
incomplete_addon = addon_factory(status=amo.STATUS_NULL)
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=incomplete_addon.versions.all()[0]
|
||||
)
|
||||
self.scanner_query_result_factory(version=incomplete_addon.versions.get())
|
||||
deleted_addon = addon_factory(status=amo.STATUS_DELETED)
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=deleted_addon.versions.all()[0]
|
||||
)
|
||||
self.scanner_query_result_factory(version=deleted_addon.versions.get())
|
||||
|
||||
response = self.client.get(
|
||||
self.list_url,
|
||||
|
@ -1629,13 +1624,9 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
|
||||
def test_list_filter_addon_visibility(self):
|
||||
visible_addon = addon_factory()
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=visible_addon.versions.all()[0]
|
||||
)
|
||||
self.scanner_query_result_factory(version=visible_addon.versions.get())
|
||||
invisible_addon = addon_factory(disabled_by_user=True)
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=invisible_addon.versions.all()[0]
|
||||
)
|
||||
self.scanner_query_result_factory(version=invisible_addon.versions.get())
|
||||
|
||||
response = self.client.get(
|
||||
self.list_url,
|
||||
|
@ -1664,11 +1655,9 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
disabled_file_version = version_factory(
|
||||
addon=addon_disabled_file, file_kw={'status': amo.STATUS_DISABLED}
|
||||
)
|
||||
ScannerQueryResult.objects.create(scanner=YARA, version=disabled_file_version)
|
||||
self.scanner_query_result_factory(version=disabled_file_version)
|
||||
addon_approved_file = addon_factory()
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=addon_approved_file.versions.all()[0]
|
||||
)
|
||||
self.scanner_query_result_factory(version=addon_approved_file.versions.get())
|
||||
|
||||
response = self.client.get(
|
||||
self.list_url,
|
||||
|
@ -1694,13 +1683,9 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
|
||||
def test_list_filter_file_is_signed(self):
|
||||
signed_addon = addon_factory(file_kw={'is_signed': True})
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=signed_addon.versions.all()[0]
|
||||
)
|
||||
self.scanner_query_result_factory(version=signed_addon.versions.get())
|
||||
unsigned_addon = addon_factory(file_kw={'is_signed': False})
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=unsigned_addon.versions.all()[0]
|
||||
)
|
||||
self.scanner_query_result_factory(version=unsigned_addon.versions.get())
|
||||
|
||||
response = self.client.get(
|
||||
self.list_url,
|
||||
|
@ -1726,20 +1711,16 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
|
||||
def test_list_filter_was_blocked(self):
|
||||
was_blocked_addon = addon_factory()
|
||||
self.scanner_query_result_factory(
|
||||
version=was_blocked_addon.versions.get(), was_blocked=True
|
||||
)
|
||||
was_blocked_unknown_addon = addon_factory()
|
||||
self.scanner_query_result_factory(
|
||||
version=was_blocked_unknown_addon.versions.get(), was_blocked=None
|
||||
)
|
||||
was_blocked_false_addon = addon_factory()
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=was_blocked_addon.current_version, was_blocked=True
|
||||
)
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA,
|
||||
version=was_blocked_unknown_addon.current_version,
|
||||
was_blocked=None,
|
||||
)
|
||||
ScannerQueryResult.objects.create(
|
||||
scanner=YARA,
|
||||
version=was_blocked_false_addon.current_version,
|
||||
was_blocked=False,
|
||||
self.scanner_query_result_factory(
|
||||
version=was_blocked_false_addon.versions.get(), was_blocked=False
|
||||
)
|
||||
|
||||
response = self.client.get(
|
||||
|
@ -1776,20 +1757,19 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
assert doc('.field-guid').text() == was_blocked_unknown_addon.guid
|
||||
|
||||
def test_change_page(self):
|
||||
rule = ScannerQueryRule.objects.create(name='darule', scanner=YARA)
|
||||
result = ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=addon_factory().current_version
|
||||
result = self.scanner_query_result_factory(
|
||||
version=addon_factory().current_version
|
||||
)
|
||||
result.add_yara_result(rule=rule.name)
|
||||
result.save()
|
||||
url = reverse('admin:scanners_scannerqueryresult_change', args=(result.pk,))
|
||||
response = self.client.get(url)
|
||||
assert response.status_code == 200
|
||||
|
||||
rule_url = reverse('admin:scanners_scannerqueryrule_change', args=(rule.pk,))
|
||||
rule_url = reverse(
|
||||
'admin:scanners_scannerqueryrule_change', args=(self.rule.pk,)
|
||||
)
|
||||
doc = pq(response.content)
|
||||
link = doc('.field-formatted_matched_rules_with_files td a')
|
||||
assert link.text() == 'darule ???'
|
||||
assert link.text() == 'myrule ???'
|
||||
assert link.attr('href') == rule_url
|
||||
|
||||
link_response = self.client.get(rule_url)
|
||||
|
@ -1801,12 +1781,9 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
# ScannerQueryResults.
|
||||
self.grant_permission(self.user, 'Admin:ScannersResultsEdit')
|
||||
self.client.force_login(self.user)
|
||||
rule = ScannerQueryRule.objects.create(name='darule', scanner=YARA)
|
||||
result = ScannerQueryResult.objects.create(
|
||||
scanner=YARA, version=addon_factory().current_version
|
||||
result = self.scanner_query_result_factory(
|
||||
version=addon_factory().current_version
|
||||
)
|
||||
result.add_yara_result(rule=rule.name)
|
||||
result.save()
|
||||
url = reverse('admin:scanners_scannerqueryresult_change', args=(result.pk,))
|
||||
response = self.client.get(url)
|
||||
assert response.status_code == 403
|
||||
|
@ -1819,7 +1796,7 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
|
||||
def test_formatted_matched_rules_with_files(self):
|
||||
version = addon_factory().current_version
|
||||
result = ScannerQueryResult.objects.create(scanner=YARA, version=version)
|
||||
result = ScannerQueryResult(scanner=YARA, version=version)
|
||||
rule = ScannerQueryRule.objects.create(name='bar', scanner=YARA)
|
||||
filename = 'some/file.js'
|
||||
result.add_yara_result(rule=rule.name, meta={'filename': filename})
|
||||
|
@ -1840,7 +1817,7 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
rule = ScannerQueryRule.objects.create(
|
||||
name='foo', scanner=YARA, created=self.days_ago(2)
|
||||
)
|
||||
result1 = ScannerQueryResult.objects.create(
|
||||
result1 = ScannerQueryResult(
|
||||
scanner=YARA, version=addon_factory().current_version
|
||||
)
|
||||
result1.add_yara_result(
|
||||
|
@ -1850,7 +1827,7 @@ class TestScannerQueryResultAdmin(TestCase):
|
|||
rule=rule.name, meta={'filename': 'another/file/somewhereelse.js'}
|
||||
)
|
||||
result1.save()
|
||||
result2 = ScannerQueryResult.objects.create(
|
||||
result2 = ScannerQueryResult(
|
||||
scanner=YARA,
|
||||
version=addon_factory().current_version,
|
||||
created=self.days_ago(1),
|
||||
|
|
|
@ -43,11 +43,14 @@ class FakeYaraMatch:
|
|||
class TestScannerResultMixin:
|
||||
__test__ = False
|
||||
|
||||
def create_result(self, *args, **kwargs):
|
||||
return self.model.objects.create(*args, **kwargs)
|
||||
|
||||
def create_customs_result(self):
|
||||
return self.model.objects.create(scanner=CUSTOMS)
|
||||
return self.create_result(scanner=CUSTOMS)
|
||||
|
||||
def create_mad_result(self):
|
||||
return self.model.objects.create(scanner=MAD)
|
||||
return self.create_result(scanner=MAD)
|
||||
|
||||
def create_fake_yara_match(
|
||||
self,
|
||||
|
@ -66,7 +69,7 @@ class TestScannerResultMixin:
|
|||
)
|
||||
|
||||
def create_yara_result(self):
|
||||
return self.model.objects.create(scanner=YARA)
|
||||
return self.create_result(scanner=YARA)
|
||||
|
||||
def test_add_yara_result(self):
|
||||
result = self.create_yara_result()
|
||||
|
@ -78,32 +81,6 @@ class TestScannerResultMixin:
|
|||
{'rule': match.rule, 'tags': match.tags, 'meta': match.meta}
|
||||
]
|
||||
|
||||
def test_save_set_has_matches(self):
|
||||
result = self.create_yara_result()
|
||||
rule = self.rule_model.objects.create(
|
||||
name='some rule name', scanner=result.scanner
|
||||
)
|
||||
|
||||
result.has_matches = None
|
||||
result.save()
|
||||
assert result.has_matches is False
|
||||
|
||||
result.has_matches = None
|
||||
result.results = [{'rule': rule.name}] # Fake match
|
||||
result.save()
|
||||
assert result.has_matches is True
|
||||
|
||||
def test_save_ignores_disabled_rules(self):
|
||||
result = self.create_yara_result()
|
||||
rule = self.rule_model.objects.create(
|
||||
name='some rule name', scanner=result.scanner, is_active=False
|
||||
)
|
||||
|
||||
result.has_matches = None
|
||||
result.results = [{'rule': rule.name}] # Fake match
|
||||
result.save()
|
||||
assert result.has_matches is False
|
||||
|
||||
def test_extract_rule_names_with_no_yara_results(self):
|
||||
result = self.create_yara_result()
|
||||
assert result.extract_rule_names() == []
|
||||
|
@ -166,29 +143,6 @@ class TestScannerResultMixin:
|
|||
result = self.create_customs_result()
|
||||
assert result.get_git_repository() is None
|
||||
|
||||
def test_can_report_feedback(self):
|
||||
result = self.create_customs_result()
|
||||
assert result.can_report_feedback()
|
||||
|
||||
def test_can_report_feedback_is_false_when_state_is_not_unknown(self):
|
||||
result = self.create_customs_result()
|
||||
result.state = FALSE_POSITIVE
|
||||
assert not result.can_report_feedback()
|
||||
|
||||
def test_can_report_feedback_is_false_when_scanner_is_mad(self):
|
||||
result = self.create_mad_result()
|
||||
assert not result.can_report_feedback()
|
||||
|
||||
def test_can_revert_feedback_for_triaged_result(self):
|
||||
result = self.create_yara_result()
|
||||
result.state = FALSE_POSITIVE
|
||||
assert result.can_revert_feedback()
|
||||
|
||||
def test_cannot_revert_feedback_for_untriaged_result(self):
|
||||
result = self.create_yara_result()
|
||||
assert result.state == UNKNOWN
|
||||
assert not result.can_revert_feedback()
|
||||
|
||||
def test_get_files_by_matched_rules_with_no_yara_results(self):
|
||||
result = self.create_yara_result()
|
||||
assert result.get_files_by_matched_rules() == {}
|
||||
|
@ -286,13 +240,9 @@ class TestScannerResult(TestScannerResultMixin, TestCase):
|
|||
channel=amo.CHANNEL_LISTED,
|
||||
)
|
||||
|
||||
def create_customs_result(self):
|
||||
upload = self.create_file_upload()
|
||||
return self.model.objects.create(upload=upload, scanner=CUSTOMS)
|
||||
|
||||
def create_yara_result(self):
|
||||
upload = self.create_file_upload()
|
||||
return self.model.objects.create(upload=upload, scanner=YARA)
|
||||
def create_result(self, *args, **kwargs):
|
||||
kwargs['upload'] = self.create_file_upload()
|
||||
return super().create_result(*args, **kwargs)
|
||||
|
||||
def test_create(self):
|
||||
upload = self.create_file_upload()
|
||||
|
@ -324,12 +274,67 @@ class TestScannerResult(TestScannerResultMixin, TestCase):
|
|||
|
||||
assert result.upload is None
|
||||
|
||||
def test_can_report_feedback(self):
|
||||
result = self.create_customs_result()
|
||||
assert result.can_report_feedback()
|
||||
|
||||
def test_can_report_feedback_is_false_when_state_is_not_unknown(self):
|
||||
result = self.create_customs_result()
|
||||
result.state = FALSE_POSITIVE
|
||||
assert not result.can_report_feedback()
|
||||
|
||||
def test_can_report_feedback_is_false_when_scanner_is_mad(self):
|
||||
result = self.create_mad_result()
|
||||
assert not result.can_report_feedback()
|
||||
|
||||
def test_can_revert_feedback_for_triaged_result(self):
|
||||
result = self.create_yara_result()
|
||||
result.state = FALSE_POSITIVE
|
||||
assert result.can_revert_feedback()
|
||||
|
||||
def test_cannot_revert_feedback_for_untriaged_result(self):
|
||||
result = self.create_yara_result()
|
||||
assert result.state == UNKNOWN
|
||||
assert not result.can_revert_feedback()
|
||||
|
||||
def test_save_set_has_matches(self):
|
||||
result = self.create_yara_result()
|
||||
rule = self.rule_model.objects.create(
|
||||
name='some rule name', scanner=result.scanner
|
||||
)
|
||||
|
||||
result.has_matches = None
|
||||
result.save()
|
||||
assert result.has_matches is False
|
||||
|
||||
result.has_matches = None
|
||||
result.results = [{'rule': rule.name}] # Fake match
|
||||
result.save()
|
||||
assert result.has_matches is True
|
||||
|
||||
def test_save_ignores_disabled_rules(self):
|
||||
result = self.create_yara_result()
|
||||
rule = self.rule_model.objects.create(
|
||||
name='some rule name', scanner=result.scanner, is_active=False
|
||||
)
|
||||
|
||||
result.has_matches = None
|
||||
result.results = [{'rule': rule.name}] # Fake match
|
||||
result.save()
|
||||
assert result.has_matches is False
|
||||
|
||||
|
||||
class TestScannerQueryResult(TestScannerResultMixin, TestCase):
|
||||
__test__ = True
|
||||
model = ScannerQueryResult
|
||||
rule_model = ScannerQueryRule
|
||||
|
||||
def create_result(self, *args, **kwargs):
|
||||
# We can't save ScannerQueryResults in database without a rule, so for
|
||||
# this test class create_result() is overridden to not save the result
|
||||
# initially - it will be saved later when adding the result data.
|
||||
return self.model(*args, **kwargs)
|
||||
|
||||
|
||||
class TestScannerRuleMixin:
|
||||
__test__ = False
|
||||
|
|
Загрузка…
Ссылка в новой задаче