Convert IP addresses to strings in MySQL before calling `GROUP_CONCAT()` (#19786)

* Convert IP addresses to strings in MySQL before calling GROUP_CONCAT()

This avoids issues with commas inside the bytestring representation
of the binary value, which can happen for some IPs.
This commit is contained in:
Mathieu Pillard 2022-10-12 17:45:28 +02:00 коммит произвёл GitHub
Родитель 94d8e57e78
Коммит 92979934a4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 19 добавлений и 11 удалений

Просмотреть файл

@ -7,6 +7,8 @@ from urllib.parse import urljoin
from django.conf import settings
from django.core.files.storage import default_storage as storage
from django.db import models
from django.db.models.expressions import Func
from django.db.models.fields import CharField
from django.db.models.fields.related_descriptors import ManyToManyDescriptor
from django.db.models.query import ModelIterable
from django.urls import resolve, reverse
@ -633,3 +635,8 @@ class FilterableManyToManyField(models.fields.related.ManyToManyField):
class GroupConcat(models.Aggregate):
function = 'GROUP_CONCAT'
allow_distinct = True
class Inet6Ntoa(Func):
function = 'INET6_NTOA'
output_field = CharField()

Просмотреть файл

@ -6,7 +6,6 @@ from django import http
from django.contrib import admin, messages
from django.contrib.admin.utils import unquote
from django.db.models import (
CharField,
Count,
F,
FilteredRelation,
@ -37,7 +36,7 @@ from olympia.amo.admin import (
SEARCH_VAR,
)
from olympia.amo.fields import IPAddressBinaryField
from olympia.amo.models import GroupConcat
from olympia.amo.models import GroupConcat, Inet6Ntoa
from olympia.api.models import APIKey, APIKeyConfirmation
from olympia.bandwagon.models import Collection
from olympia.ratings.models import Rating
@ -242,11 +241,8 @@ class UserAdmin(CommaSearchInAdminMixin, admin.ModelAdmin):
# FilteredRelation we can force it...
annotations = {
'activity_ips': GroupConcat(
'activitylog__iplog__ip_address_binary',
Inet6Ntoa('activitylog__iplog__ip_address_binary'),
distinct=True,
# You can't have multiple ip addresses in an IPv[4|6]Address object
# so we store the binary values in a CharField instead.
output_field=CharField(),
),
'activitylog_filtered': FilteredRelation(
'activitylog__iplog', condition=condition
@ -482,11 +478,12 @@ class UserAdmin(CommaSearchInAdminMixin, admin.ModelAdmin):
# extra queries for each row of results), otherwise, look everywhere
# we can.
activity_ips = getattr(obj, 'activity_ips', None)
to_ipaddress = IPAddressBinaryField().to_python
if activity_ips is not None:
# The GroupConcat value is a comma seperated string of the binary values.
ip_addresses = {to_ipaddress(ip) for ip in activity_ips.split(b',')}
# The GroupConcat value is a comma seperated string of the ip
# addresses (already converted to string thanks to INET6_NTOA).
ip_addresses = set(activity_ips.split(','))
else:
to_ipaddress = IPAddressBinaryField().to_python
ip_addresses = set(
Rating.objects.filter(user=obj)
.values_list('ip_address', flat=True)

Просмотреть файл

@ -127,7 +127,11 @@ class TestUserAdmin(TestCase):
self.user.update(email='foo@bar.com')
# That will make self.user match our query.
ActivityLog.create(amo.LOG.LOG_IN, user=self.user)
with core.override_remote_addr('127.0.0.3'):
# 127.0.0.44 packed is b'\x7f\x00\x00,' - it's useful to test since it
# contains a comma character, which would trip the split(',') we do on
# the result of the GROUP_CONCAT if the binary value wasn't translated
# to an IP address string representation before being grouped.
with core.override_remote_addr('127.0.0.44'):
# Add another login from a different IP to make sure we show it.
ActivityLog.create(amo.LOG.LOG_IN, user=self.user)
with core.override_remote_addr('127.0.0.1'):
@ -145,7 +149,7 @@ class TestUserAdmin(TestCase):
# Make sure it's the right user.
assert doc('.field-email').text() == self.user.email
# Make sure login ip is now displayed, and has the right value.
assert doc('.field-known_ip_adresses').text() == '127.0.0.2\n127.0.0.3'
assert doc('.field-known_ip_adresses').text() == '127.0.0.2\n127.0.0.44'
def test_search_for_single_ip_multiple_results_for_different_reasons(self):
user = user_factory(email='someone@mozilla.com')