add in hash and make sure we can find it when email changes (bug 691596)

This commit is contained in:
Andy McKay 2011-10-04 16:01:26 -07:00
Родитель 0afb2d6a05
Коммит aa543b7131
9 изменённых файлов: 120 добавлений и 13 удалений

Просмотреть файл

@ -1,5 +1,7 @@
# -*- coding: utf8 -*-
import collections
import hashlib
import hmac
import itertools
import json
import os
@ -35,6 +37,7 @@ from translations.fields import (TranslatedField, PurifiedField,
LinkifiedField, Translation)
from translations.query import order_by_translation
from users.models import UserProfile, PersonaAuthor, UserForeignKey
from users.utils import find_users
from versions.compare import version_int
from versions.models import Version
@ -1017,6 +1020,27 @@ class Addon(amo.models.OnChangeMixin, amo.models.ModelBase):
"""Return all the add-ons this add-on depends on."""
return list(self.dependencies.all()[:9])
def get_watermark_hash(self, user):
"""
Create a hash for the addon using the user and addon. Suitable for
receipts or addon updates.
"""
keys = [user.pk, time.mktime(user.created.timetuple()),
self.pk, time.mktime(self.created.timetuple())]
return hmac.new(settings.WATERMARK_SECRET_KEY,
''.join(map(str, keys)),
hashlib.sha512).hexdigest()
def get_user_from_hash(self, email, hsh):
"""
Will try and match the watermark hash against a series of users,
based on any users who has had the addon. Will return the user
if it's found the person, otherwise None.
"""
for user in find_users(email):
if hsh == self.get_watermark_hash(user):
return user
@receiver(dbsignals.post_save, sender=Addon,
dispatch_uid='addons.update.name.table')

Просмотреть файл

@ -1710,3 +1710,44 @@ class TestAddonPurchase(amo.tests.TestCase):
def test_anonymous(self):
assert not self.addon.has_purchased(None)
assert not self.addon.has_purchased(AnonymousUser)
class TestWatermarkHash(amo.tests.TestCase):
fixtures = ['base/addon_3615', 'base/users']
def setUp(self):
self.addon = Addon.objects.get(pk=3615)
self.user = UserProfile.objects.get(email='regular@mozilla.com')
def test_watermark_change_email(self):
hsh = self.addon.get_watermark_hash(self.user)
self.user.update(email='foo@bar.com')
eq_(hsh, self.addon.get_watermark_hash(self.user))
def test_check_hash(self):
hsh = self.addon.get_watermark_hash(self.user)
eq_(self.user, self.addon.get_user_from_hash(self.user.email, hsh))
def test_check_hash_messed(self):
hsh = self.addon.get_watermark_hash(self.user)
hsh = hsh + 'asd'
eq_(None, self.addon.get_user_from_hash(self.user.email, hsh))
def test_check_user_change(self):
self.user.update(email='foo@bar.com')
hsh = self.addon.get_watermark_hash(self.user)
eq_(self.user,
self.addon.get_user_from_hash('regular@mozilla.com', hsh))
def test_check_user_multiple(self):
hsh = self.addon.get_watermark_hash(self.user)
self.user.update(email='foo@bar.com')
UserProfile.objects.create(email='regular@mozilla.com')
eq_(self.user,
self.addon.get_user_from_hash('regular@mozilla.com', hsh))
def test_cant_takeover(self):
hsh = self.addon.get_watermark_hash(self.user)
self.user.delete()
UserProfile.objects.create(email='regular@mozilla.com')
eq_(None, self.addon.get_user_from_hash('regular@mozilla.com', hsh))

Просмотреть файл

@ -323,3 +323,4 @@ CONTRIB_TYPE_DEFAULT = CONTRIB_VOLUNTARY
# Used to watermark addons install.rdf and update.
WATERMARK_KEY = 'purchaser'
WATERMARK_KEY_HASH = '%s-hash' % WATERMARK_KEY

Просмотреть файл

@ -1,5 +1,6 @@
from datetime import datetime, timedelta
import hashlib
import hmac
import json
import os
import posixpath
@ -369,7 +370,7 @@ class File(amo.models.OnChangeMixin, amo.models.ModelBase):
try:
install = inzip.extract_path('install.rdf')
data = RDF(install)
data.set(user.email)
data.set(user.email, self.version.addon.get_watermark_hash(user))
except Exception, e:
log.error('Could not alter install.rdf in file: %s for %s, %s'
% (self.pk, user.pk, e))

Просмотреть файл

@ -7,11 +7,13 @@ import shutil
import stat
import tempfile
import time
import urllib
from xml.parsers import expat
import zipfile
from django import forms
from django.conf import settings
from django.utils.encoding import smart_str
import mock
import path
@ -871,7 +873,7 @@ class TestWatermark(amo.tests.TestCase, amo.tests.AMOPaths):
def test_install_rdf(self):
self.user.update(email='a@a.com')
data = self.file.watermark_install_rdf(self.user)
eq_(self.user.email in str(data), True)
assert urllib.quote_plus(self.user.email) in str(data)
@mock.patch('files.utils.SafeUnzip.extract_path')
def test_install_rdf_no_data(self, extract_path):
@ -884,19 +886,25 @@ class TestWatermark(amo.tests.TestCase, amo.tests.AMOPaths):
data = self.file.watermark_install_rdf(self.user)
self.file.write_watermarked_addon(self.dest, data)
assert os.path.exists(self.dest)
eq_(self.user.email in
self.get_updateURL(self.get_rdf(self.dest)), True)
encoded = urllib.quote_plus(self.user.email)
assert encoded in self.get_updateURL(self.get_rdf(self.dest))
def test_watermark(self):
tmp = self.file.watermark(self.user)
eq_(self.user.email in
self.get_updateURL(self.get_rdf(tmp)), True)
encoded = urllib.quote_plus(self.user.email)
assert encoded in self.get_updateURL(self.get_rdf(tmp))
def test_watermark_hash(self):
tmp = self.file.watermark(self.user)
wm = self.file.version.addon.get_watermark_hash(self.user)
hsh = '&%s=%s' % (amo.WATERMARK_KEY_HASH, wm)
assert hsh in self.get_updateURL(self.get_rdf(tmp))
def test_watermark_unicode(self):
self.user.email = u'Strauß@Magyarország.com'
tmp = self.file.watermark(self.user)
eq_(self.user.email in
self.get_updateURL(self.get_rdf(tmp)), True)
encoded = urllib.quote_plus(smart_str(self.user.email))
assert encoded in self.get_updateURL(self.get_rdf(tmp))
def test_watermark_no_description(self):
self.assertRaises(IndexError, self.get_extract, """
@ -913,7 +921,9 @@ class TestWatermark(amo.tests.TestCase, amo.tests.AMOPaths):
<em:updateURL>http://my.other.site/</em:updateURL>
</Description>
</RDF>""")
eq_(self.user.email in self.get_updateURL(self.get_rdf(tmp)), True)
encoded = urllib.quote_plus(smart_str(self.user.email))
assert encoded in self.get_updateURL(self.get_rdf(tmp))
def test_watermark_overwrites_multiple(self):
tmp = self.get_extract("""
@ -925,7 +935,8 @@ class TestWatermark(amo.tests.TestCase, amo.tests.AMOPaths):
<em:updateURL>http://my.other.other.site/</em:updateURL>
</Description>
</RDF>""")
eq_(self.user.email in self.get_updateURL(self.get_rdf(tmp)), True)
encoded = urllib.quote_plus(smart_str(self.user.email))
assert encoded in self.get_updateURL(self.get_rdf(tmp))
# one close and one open
eq_(str(self.get_rdf(tmp)).count('updateURL'), 2)

Просмотреть файл

@ -18,6 +18,7 @@ from zipfile import BadZipfile
from django import forms
from django.conf import settings
from django.utils.http import urlencode
from django.utils.translation import trans_real as translation
import rdflib
@ -530,7 +531,7 @@ class RDF(object):
self.dom.writexml(buf, encoding="utf-8")
return buf.getvalue().encode('utf-8')
def set(self, value):
def set(self, user, hsh):
parent = (self.dom.documentElement
.getElementsByTagName('Description')[0])
existing = parent.getElementsByTagName('em:updateURL')
@ -539,7 +540,9 @@ class RDF(object):
parent.removeChild(current)
current.unlink()
value = u'%s&%s=%s' % (default, amo.WATERMARK_KEY, value)
qs = urlencode({amo.WATERMARK_KEY: user,
amo.WATERMARK_KEY_HASH: hsh})
value = u'%s&%s' % (default, qs)
elem = self.dom.createElement('em:updateURL')
elem.appendChild(self.dom.createTextNode(value))
parent.appendChild(elem)

Просмотреть файл

@ -19,6 +19,7 @@ from reviews.models import Review
from users.models import (UserProfile, get_hexdigest, BlacklistedEmailDomain,
BlacklistedPassword, BlacklistedUsername,
UserEmailField)
from users.utils import find_users
class TestUserProfile(amo.tests.TestCase):
@ -241,3 +242,17 @@ class TestUserHistory(amo.tests.TestCase):
eq_(user.history.count(), 1)
user.update(email='foopy@barby.com')
eq_(user.history.count(), 1)
def test_user_find(self):
user = UserProfile.objects.create(email='luke@jedi.com')
user.update(email='dark@sith.com')
eq_([user], list(find_users('luke@jedi.com')))
eq_([user], list(find_users('dark@sith.com')))
def test_user_find_multiple(self):
user_1 = UserProfile.objects.create(username='user_1',
email='luke@jedi.com')
user_1.update(email='dark@sith.com')
user_2 = UserProfile.objects.create(username='user_2',
email='luke@jedi.com')
eq_([user_1, user_2], list(find_users('luke@jedi.com')))

Просмотреть файл

@ -3,6 +3,7 @@ import hashlib
import time
from django.conf import settings
from django.db.models import Q
import commonware.log
@ -90,3 +91,12 @@ def get_task_user():
cron jobs or long running tasks.
"""
return UserProfile.objects.get(pk=settings.TASK_USER_ID)
def find_users(email):
"""
Given an email find all the possible users, by looking in
users and in their history.
"""
return UserProfile.objects.filter(Q(email=email) |
Q(history__email=email))

Просмотреть файл

@ -1203,7 +1203,8 @@ WATERMARK_REUSE_SECONDS = 1800
# by a cron. Setting this far apart from the reuse flag so that we
# shouldn't have an overlap.
WATERMARK_CLEANUP_SECONDS = 3600
# Used in providing a hash for the download of the addon.
WATERMARK_SECRET_KEY = ''
CSRF_FAILURE_VIEW = 'amo.views.csrf_failure'