Submit bloom filter as kinto attachment with cron job (#13659)

* refactor generateMLBF and export_blocklist for generic use

* submit mlbf as attachment to kinto api

* waffle cron job; switch to secrets.token_hex; use Version.unfiltered

* update cron tests
This commit is contained in:
Andrew Williamson 2020-03-09 19:06:42 +00:00 коммит произвёл GitHub
Родитель ad4c83331b
Коммит 8c599abe59
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 368 добавлений и 183 удалений

Просмотреть файл

@ -120,8 +120,6 @@ celery==4.4.1 \
botocore==1.15.16 \
--hash=sha256:1109f36e658de2097d1e466842d6634a6b66bb9d3779abe16698171360e1ae5f \
--hash=sha256:39e903e1d1ae862e469b4d5f15dc6770a7c9c81da9fcffb1a40f551ea36acd35
bsdiff4==1.1.9 \
--hash=sha256:a7c48ec58dc1c4a2b9fb5da05637524d5dbb1643a3df7cdc6ed105636909ff79
# chardet is required by requests
chardet==3.0.4 \
--hash=sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691 \

Просмотреть файл

@ -17,8 +17,8 @@ HOME=/tmp
50 * * * * %(z_cron)s cleanup_extracted_file
55 * * * * %(z_cron)s unhide_disabled_files
# Twice per day
35 18,6 * * * %(z_cron)s cleanup_image_files
# Four times per day
35 18,12,6,0 * * * %(z_cron)s upload_mlbf_to_kinto
# Once per day
1 6 * * * %(django)s clear_old_last_login_ip

Просмотреть файл

@ -0,0 +1,31 @@
import json
import tempfile
import waffle
import olympia.core.logger
from olympia.lib.kinto import KintoServer
from .mlbf import generate_mlbf, get_mlbf_key_format
from .utils import KINTO_BUCKET, KINTO_COLLECTION_MLBF
log = olympia.core.logger.getLogger('z.cron')
def upload_mlbf_to_kinto():
if not waffle.switch_is_active('blocklist_mlbf_submit'):
log.info('Upload MLBF to kinto cron job disabled.')
return
log.info('Starting Upload MLBF to kinto cron job.')
server = KintoServer(KINTO_BUCKET, KINTO_COLLECTION_MLBF)
stats = {}
key_format = get_mlbf_key_format()
bloomfilter = generate_mlbf(stats, key_format)
with tempfile.NamedTemporaryFile() as filter_file:
bloomfilter.tofile(filter_file)
filter_file.seek(0)
# TODO: sign filter blob
data = {'key_format': key_format}
attachment = ('filter.bin', filter_file, 'application/octet-stream')
server.publish_attachment(data, attachment)
log.info(json.dumps(stats))

Просмотреть файл

@ -1,4 +1,3 @@
import bsdiff4
import json
import os
@ -8,10 +7,7 @@ from django.core.files.storage import default_storage
import olympia.core.logger
from olympia.addons.models import Addon
from olympia.blocklist.models import Block
from olympia.blocklist.utils import generateMLBF
from olympia.files.models import File
from olympia.blocklist.mlbf import generate_mlbf, get_mlbf_key_format
log = olympia.core.logger.getLogger('z.amo.blocklist')
@ -23,20 +19,15 @@ class Command(BaseCommand):
def add_arguments(self, parser):
"""Handle command arguments."""
parser.add_argument(
'--capacity',
type=float,
default='1.1',
dest='capacity',
help='MLBF capacity.')
'--salt',
type=int,
default=None,
dest='salt',
help='Bloom filter salt')
parser.add_argument(
'id',
help="CT baseline identifier",
metavar=('ID'))
parser.add_argument(
'--previous-id',
help="Previous identifier to use for diff",
metavar=('DIFFID'),
default=None)
parser.add_argument(
'--addon-guids-input',
help='Path to json file with [[guid, version],...] data for all '
@ -50,42 +41,12 @@ class Command(BaseCommand):
'the database',
default=None)
def get_blocked_guids(self):
blocks = Block.objects.all()
blocks_guids = [block.guid for block in blocks]
addons_dict = Addon.unfiltered.in_bulk(blocks_guids, field_name='guid')
for block in blocks:
block.addon = addons_dict.get(block.guid)
Block.preload_addon_versions(blocks)
all_versions = {}
# First collect all the blocked versions
for block in blocks:
is_all_versions = (
block.min_version == Block.MIN and
block.max_version == Block.MAX)
versions = {
version_id: (block.guid, version)
for version, (version_id, _) in block.addon_versions.items()
if is_all_versions or block.is_version_blocked(version)}
all_versions.update(versions)
# Now we need the cert_ids
cert_nums = File.objects.filter(
version_id__in=all_versions.keys()).values_list(
'version_id', 'cert_serial_num')
return [
(all_versions[version_id][0], all_versions[version_id][1], cert_nm)
for version_id, cert_nm in cert_nums]
def get_all_guids(self):
return File.objects.values_list(
'version__addon__guid', 'version__version', 'cert_serial_num')
def load_json(self, json_path):
with open(json_path) as json_file:
data = json.load(json_file)
return [tuple(record) for record in data]
def save_blocklist(self, stats, mlbf, id_, previous_id=None):
def save_blocklist(self, stats, mlbf, id_):
out_file = os.path.join(settings.TMP_PATH, 'mlbf', id_, 'filter')
meta_file = os.path.join(settings.TMP_PATH, 'mlbf', id_, 'filter.meta')
@ -100,44 +61,21 @@ class Command(BaseCommand):
mlbf.saveDiffMeta(mlbf_meta_file)
stats['mlbf_metafilesize'] = os.stat(meta_file).st_size
if previous_id:
diff_base_file = os.path.join(
settings.TMP_PATH, 'mlbf', str(previous_id), 'filter')
patch_file = os.path.join(
settings.TMP_PATH, 'mlbf', id_, 'filter.patch')
log.info(
"Generating patch file {patch} from {base} to {out}".format(
patch=patch_file, base=diff_base_file,
out=out_file))
bsdiff4.file_diff(
diff_base_file, out_file, patch_file)
stats['mlbf_diffsize'] = os.stat(patch_file).st_size
def handle(self, *args, **options):
log.debug('Exporting blocklist to file')
stats = {}
blocked_guids = (
self.load_json(options.get('block_guids_input'))
if options.get('block_guids_input') else
self.get_blocked_guids())
all_guids = (
self.load_json(options.get('addon_guids_input'))
if options.get('addon_guids_input') else
self.get_all_guids())
not_blocked_guids = list(set(all_guids) - set(blocked_guids))
stats['mlbf_blocked_count'] = len(blocked_guids)
stats['mlbf_unblocked_count'] = len(not_blocked_guids)
generate_kw = {}
if options.get('block_guids_input'):
generate_kw['blocked'] = (
self.load_json(options.get('block_guids_input')))
if options.get('addon_guids_input'):
generate_kw['not_blocked'] = (
self.load_json(options.get('addon_guids_input')))
mlbf = generateMLBF(
stats,
blocked=blocked_guids,
not_blocked=not_blocked_guids,
capacity=options.get('capacity'),
diffMetaFile=None)
mlbf.check(entries=blocked_guids, exclusions=not_blocked_guids)
salt = options.get('salt')
mlbf = generate_mlbf(stats, get_mlbf_key_format(salt), **generate_kw)
self.save_blocklist(
stats,
mlbf,
options.get('id'),
options.get('previous_id'))
options.get('id'))
print(stats)

Просмотреть файл

@ -0,0 +1,84 @@
import math
import secrets
from filtercascade import FilterCascade
import olympia.core.logger
log = olympia.core.logger.getLogger('z.amo.blocklist')
def get_blocked_guids():
from olympia.addons.models import Addon
from olympia.blocklist.models import Block
blocks = Block.objects.all()
blocks_guids = [block.guid for block in blocks]
addons_dict = Addon.unfiltered.in_bulk(blocks_guids, field_name='guid')
for block in blocks:
block.addon = addons_dict.get(block.guid)
Block.preload_addon_versions(blocks)
all_versions = {}
# collect all the blocked versions
for block in blocks:
is_all_versions = (
block.min_version == Block.MIN and
block.max_version == Block.MAX)
versions = {
version_id: (block.guid, version)
for version, (version_id, _) in block.addon_versions.items()
if is_all_versions or block.is_version_blocked(version)}
all_versions.update(versions)
return all_versions.values()
def get_all_guids():
from olympia.versions.models import Version
return Version.unfiltered.values_list('addon__guid', 'version')
def hash_filter_inputs(input_list, key_format):
return [
key_format.format(guid=guid, version=version)
for (guid, version) in input_list]
def get_mlbf_key_format(salt=None):
salt = salt or secrets.token_hex(16)
return '%s:{guid}:{version}' % salt
def generate_mlbf(stats, key_format, *, blocked=None, not_blocked=None):
"""Based on:
https://github.com/mozilla/crlite/blob/master/create_filter_cascade/certs_to_crlite.py
"""
blocked = hash_filter_inputs(
blocked or get_blocked_guids(), key_format)
not_blocked = hash_filter_inputs(
not_blocked or get_all_guids(), key_format)
not_blocked = list(set(not_blocked) - set(blocked))
stats['mlbf_blocked_count'] = len(blocked)
stats['mlbf_unblocked_count'] = len(not_blocked)
fprs = [len(blocked) / (math.sqrt(2) * len(not_blocked)), 0.5]
log.info("Generating filter")
cascade = FilterCascade.cascade_with_characteristics(
int(len(blocked) * 1.1), fprs)
cascade.version = 1
cascade.initialize(include=blocked, exclude=not_blocked)
stats['mlbf_fprs'] = fprs
stats['mlbf_version'] = cascade.version
stats['mlbf_layers'] = cascade.layerCount()
stats['mlbf_bits'] = cascade.bitCount()
log.debug("Filter cascade layers: {layers}, bit: {bits}".format(
layers=cascade.layerCount(), bits=cascade.bitCount()))
cascade.check(entries=blocked, exclusions=not_blocked)
return cascade

Просмотреть файл

@ -1,7 +1,6 @@
import os
import json
from datetime import datetime
from random import randint
from unittest import mock
from django.core.management import call_command
@ -10,13 +9,10 @@ from django.conf import settings
import responses
from olympia import amo
from olympia.amo.tests import (
addon_factory, TestCase, user_factory, version_factory)
from olympia.amo.tests import addon_factory, TestCase, user_factory
from olympia.blocklist.management.commands import import_blocklist
from olympia.files.models import File
from ..models import Block, KintoImport
from ..management.commands import export_blocklist
# This is a fragment of the actual json blocklist file
@ -200,79 +196,25 @@ class TestImportBlocklist(TestCase):
class TestExportBlocklist(TestCase):
def test_db_queries(self):
for idx in range(0, 10):
addon_factory(
file_kw={'cert_serial_num': str(randint(10000, 99999))})
def test_command(self):
for idx in range(0, 5):
addon_factory()
# one version, 0 - *
Block.objects.create(
addon=addon_factory(
file_kw={'cert_serial_num': str(randint(10000, 99999))}),
addon=addon_factory(),
updated_by=user_factory())
# one version, 0 - 9999
Block.objects.create(
addon=addon_factory(
file_kw={'cert_serial_num': str(randint(10000, 99999))}),
addon=addon_factory(),
updated_by=user_factory(),
max_version='9999')
# one version, 0 - *, unlisted
Block.objects.create(
addon=addon_factory(
version_kw={'channel': amo.RELEASE_CHANNEL_UNLISTED},
file_kw={'cert_serial_num': str(randint(10000, 99999))}),
version_kw={'channel': amo.RELEASE_CHANNEL_UNLISTED}),
updated_by=user_factory())
# three versions, but only two within block (123.40, 123.5)
three_ver = Block.objects.create(
addon=addon_factory(
version_kw={'version': '123.40'},
file_kw={'cert_serial_num': 'qwerty1'}),
updated_by=user_factory(), max_version='123.45')
version_factory(
addon=three_ver.addon, version='123.5',
file_kw={'cert_serial_num': 'qwerty2'})
version_factory(
addon=three_ver.addon, version='123.45.1',
file_kw={'cert_serial_num': 'qwerty3'})
# no matching versions (edge cases)
over = Block.objects.create(
addon=addon_factory(file_kw={'cert_serial_num': 'over'}),
updated_by=user_factory(),
max_version='0')
under = Block.objects.create(
addon=addon_factory(file_kw={'cert_serial_num': 'under'}),
updated_by=user_factory(),
min_version='9999')
all_guids = export_blocklist.Command().get_all_guids()
assert len(all_guids) == File.objects.count() == 10 + 8
assert (three_ver.guid, '123.40', 'qwerty1') in all_guids
assert (three_ver.guid, '123.5', 'qwerty2') in all_guids
assert (three_ver.guid, '123.45.1', 'qwerty3') in all_guids
over_tuple = (over.guid, over.addon.current_version.version, 'over')
under_tuple = (
under.guid, under.addon.current_version.version, 'under')
assert over_tuple in all_guids
assert under_tuple in all_guids
blocked_guids = export_blocklist.Command().get_blocked_guids()
assert len(blocked_guids) == 5
assert (three_ver.guid, '123.40', 'qwerty1') in blocked_guids
assert (three_ver.guid, '123.5', 'qwerty2') in blocked_guids
assert (three_ver.guid, '123.45.1', 'qwerty3') not in blocked_guids
assert over_tuple not in blocked_guids
assert under_tuple not in blocked_guids
call_command('export_blocklist', '1')
out_path = os.path.join(settings.TMP_PATH, 'mlbf', '1')
assert os.path.exists(os.path.join(out_path, 'filter'))
assert os.path.exists(os.path.join(out_path, 'filter.meta'))
# Add a new Block and repeat, to get a diff
Block.objects.create(
addon=addon_factory(
file_kw={'cert_serial_num': str(randint(10000, 99999))}),
updated_by=user_factory())
call_command('export_blocklist', '2', previous_id='1')
out_path = os.path.join(settings.TMP_PATH, 'mlbf', '2')
assert os.path.exists(os.path.join(out_path, 'filter'))
assert os.path.exists(os.path.join(out_path, 'filter.meta'))
assert os.path.exists(os.path.join(out_path, 'filter.patch'))

Просмотреть файл

@ -0,0 +1,41 @@
from unittest import mock
import pytest
from waffle.testutils import override_switch
from olympia.amo.tests import addon_factory, user_factory
from olympia.blocklist.cron import upload_mlbf_to_kinto
from olympia.blocklist.mlbf import get_mlbf_key_format
from olympia.blocklist.models import Block
from olympia.lib.kinto import KintoServer
@pytest.mark.django_db
@override_switch('blocklist_mlbf_submit', active=True)
@mock.patch('olympia.blocklist.cron.get_mlbf_key_format')
@mock.patch.object(KintoServer, 'publish_attachment')
def test_upload_mlbf_to_kinto(publish_mock, get_mlbf_key_format_mock):
key_format = get_mlbf_key_format()
get_mlbf_key_format_mock.return_value = key_format
addon_factory()
Block.objects.create(
addon=addon_factory(),
updated_by=user_factory())
upload_mlbf_to_kinto()
publish_mock.assert_called_with(
{'key_format': key_format},
('filter.bin', mock.ANY, 'application/octet-stream'))
@pytest.mark.django_db
@override_switch('blocklist_mlbf_submit', active=False)
@mock.patch.object(KintoServer, 'publish_attachment')
def test_waffle_off_disables_publishing(publish_mock):
addon_factory()
Block.objects.create(
addon=addon_factory(),
updated_by=user_factory())
upload_mlbf_to_kinto()
publish_mock.assert_not_called()

Просмотреть файл

@ -0,0 +1,109 @@
import os
import tempfile
from olympia import amo
from olympia.amo.tests import (
addon_factory, TestCase, user_factory, version_factory)
from olympia.blocklist.models import Block
from olympia.blocklist.mlbf import (
generate_mlbf, get_all_guids, get_blocked_guids, get_mlbf_key_format,
hash_filter_inputs)
from olympia.files.models import File
class TestMLBF(TestCase):
def setUp(self):
for idx in range(0, 10):
addon_factory()
# one version, 0 - *
Block.objects.create(
addon=addon_factory(),
updated_by=user_factory())
# one version, 0 - 9999
Block.objects.create(
addon=addon_factory(),
updated_by=user_factory(),
max_version='9999')
# one version, 0 - *, unlisted
Block.objects.create(
addon=addon_factory(
version_kw={'channel': amo.RELEASE_CHANNEL_UNLISTED}),
updated_by=user_factory())
# three versions, but only two within block (123.40, 123.5)
self.three_ver = Block.objects.create(
addon=addon_factory(
version_kw={'version': '123.40'}),
updated_by=user_factory(), max_version='123.45')
version_factory(
addon=self.three_ver.addon, version='123.5', deleted=True)
version_factory(
addon=self.three_ver.addon, version='123.45.1')
# no matching versions (edge cases)
self.over = Block.objects.create(
addon=addon_factory(),
updated_by=user_factory(),
max_version='0')
self.under = Block.objects.create(
addon=addon_factory(),
updated_by=user_factory(),
min_version='9999')
def test_all_guids(self):
all_guids = get_all_guids()
assert len(all_guids) == File.objects.count() == 10 + 8
assert (self.three_ver.guid, '123.40') in all_guids
assert (self.three_ver.guid, '123.5') in all_guids
assert (self.three_ver.guid, '123.45.1') in all_guids
over_tuple = (self.over.guid, self.over.addon.current_version.version)
under_tuple = (
self.under.guid, self.under.addon.current_version.version)
assert over_tuple in all_guids
assert under_tuple in all_guids
def test_get_blocked_guids(self):
blocked_guids = get_blocked_guids()
assert len(blocked_guids) == 5
assert (self.three_ver.guid, '123.40') in blocked_guids
assert (self.three_ver.guid, '123.5') in blocked_guids
assert (self.three_ver.guid, '123.45.1') not in blocked_guids
over_tuple = (self.over.guid, self.over.addon.current_version.version)
under_tuple = (
self.under.guid, self.under.addon.current_version.version)
assert over_tuple not in blocked_guids
assert under_tuple not in blocked_guids
def test_hash_filter_inputs(self):
data = [
('guid@', '1.0'),
('foo@baa', '999.223a'),
]
assert hash_filter_inputs(data, get_mlbf_key_format(37872)) == [
'37872:guid@:1.0',
'37872:foo@baa:999.223a',
]
def test_generate_mlbf(self):
stats = {}
key_format = '{guid}:{version}'
blocked = [
('guid1@', '1.0'), ('@guid2', '1.0'), ('@guid2', '1.1'),
('guid3@', '0.01b1')]
not_blocked = [
('guid10@', '1.0'), ('@guid20', '1.0'), ('@guid20', '1.1'),
('guid30@', '0.01b1'), ('guid100@', '1.0'), ('@guid200', '1.0'),
('@guid200', '1.1'), ('guid300@', '0.01b1')]
bfilter = generate_mlbf(
stats, key_format, blocked=blocked, not_blocked=not_blocked)
for entry in blocked:
key = key_format.format(guid=entry[0], version=entry[1])
assert key in bfilter
for entry in not_blocked:
key = key_format.format(guid=entry[0], version=entry[1])
assert key not in bfilter
assert stats['mlbf_version'] == 1
assert stats['mlbf_layers'] == 2
assert stats['mlbf_bits'] == 14409
with tempfile.NamedTemporaryFile() as out:
bfilter.tofile(out)
assert os.stat(out.name).st_size == 1824

Просмотреть файл

@ -1,7 +1,3 @@
import math
from filtercascade import FilterCascade
import olympia.core.logger
from olympia import amo
from olympia.activity import log_create
@ -12,6 +8,7 @@ log = olympia.core.logger.getLogger('z.amo.blocklist')
KINTO_BUCKET = 'staging'
KINTO_COLLECTION_LEGACY = 'addons'
KINTO_COLLECTION_MLBF = 'addons-mlbf'
def add_version_log_for_blocked_versions(obj, al):
@ -91,37 +88,6 @@ def splitlines(text):
return [line.strip() for line in str(text or '').splitlines()]
def generateMLBF(stats, *, blocked, not_blocked, capacity, diffMetaFile=None):
"""Based on:
https://github.com/mozilla/crlite/blob/master/create_filter_cascade/certs_to_crlite.py
"""
fprs = [len(blocked) / (math.sqrt(2) * len(not_blocked)), 0.5]
if diffMetaFile is not None:
log.info(
"Generating filter with characteristics from mlbf base file {}".
format(diffMetaFile))
mlbf_meta_file = open(diffMetaFile, 'rb')
cascade = FilterCascade.loadDiffMeta(mlbf_meta_file)
cascade.error_rates = fprs
else:
log.info("Generating filter")
cascade = FilterCascade.cascade_with_characteristics(
int(len(blocked) * capacity), fprs)
cascade.version = 1
cascade.initialize(include=blocked, exclude=not_blocked)
stats['mlbf_fprs'] = fprs
stats['mlbf_version'] = cascade.version
stats['mlbf_layers'] = cascade.layerCount()
stats['mlbf_bits'] = cascade.bitCount()
log.debug("Filter cascade layers: {layers}, bit: {bits}".format(
layers=cascade.layerCount(), bits=cascade.bitCount()))
return cascade
def legacy_publish_blocks(blocks):
server = KintoServer(KINTO_BUCKET, KINTO_COLLECTION_LEGACY)
for block in blocks:

Просмотреть файл

@ -1,4 +1,7 @@
import json
import uuid
from base64 import b64encode
from django.conf import settings
import requests
@ -117,6 +120,41 @@ class KintoServer(object):
self._needs_signoff = True
return response.json().get('data', {})
def publish_attachment(self, data, attachment, kinto_id=None):
"""Publish an attachment to a record on kinto. If `kinto_id` is not
None the existing record will be updated; otherwise a new record will
be created.
`attachment` is a tuple of (filename, file object, content type)"""
self.setup()
if not kinto_id:
log.info('Creating record')
else:
log.info(
'Updating record [%s]' % kinto_id)
headers = self.headers
del headers['Content-Type']
json_data = {'data': json.dumps(data)}
kinto_id = kinto_id or uuid.uuid4()
attach_url = (
f'{settings.KINTO_API_URL}buckets/{self.bucket}/'
f'collections/{self.collection}/records/{kinto_id}/attachment')
files = [('attachment', attachment)]
response = requests.post(
attach_url,
data=json_data,
headers=headers,
files=files)
if response.status_code not in (200, 201):
log.error(
'Creating record for [%s] failed: %s' %
(kinto_id, response.content),
stack_info=True)
raise ConnectionError('Kinto record not created/updated')
self._needs_signoff = True
return response.json().get('data', {})
def delete_record(self, kinto_id):
self.setup()
url = (

Просмотреть файл

@ -1857,6 +1857,8 @@ CRON_JOBS = {
'category_totals': 'olympia.amo.cron',
'weekly_downloads': 'olympia.amo.cron',
'upload_mlbf_to_kinto': 'olympia.blocklist.cron',
'update_blog_posts': 'olympia.devhub.cron',
'cleanup_extracted_file': 'olympia.files.cron',

Просмотреть файл

@ -1,3 +1,6 @@
import tempfile
from unittest import mock
from django.conf import settings
from django.test.utils import override_settings
@ -116,6 +119,39 @@ class TestKintoServer(TestCase):
record = server.publish_record({'something': 'somevalue'}, 'an-id')
assert record == {'id': 'updated'}
@mock.patch('olympia.lib.kinto.uuid')
def test_publish_attachment(self, uuidmock):
uuidmock.uuid4.return_value = 1234567890
server = KintoServer('foo', 'baa')
server._setup_done = True
assert not server._needs_signoff
url = (
settings.KINTO_API_URL +
'buckets/foo/collections/baa/records/1234567890/attachment')
responses.add(
responses.POST,
url,
json={'data': {'id': '1234567890'}})
with tempfile.TemporaryFile() as attachment:
record = server.publish_attachment(
{'something': 'somevalue'}, ('file', attachment))
assert server._needs_signoff
assert record == {'id': '1234567890'}
url = (
settings.KINTO_API_URL +
'buckets/foo/collections/baa/records/an-id/attachment')
responses.add(
responses.POST,
url,
json={'data': {'id': 'an-id'}})
with tempfile.TemporaryFile() as attachment:
record = server.publish_attachment(
{'something': 'somevalue'}, ('otherfile', attachment), 'an-id')
assert record == {'id': 'an-id'}
def test_delete_record(self):
server = KintoServer('foo', 'baa')
server._setup_done = True