This commit is contained in:
Ask Solem 2013-09-10 17:55:09 +01:00
Родитель c56e13ee65
Коммит d08014fde0
21 изменённых файлов: 69 добавлений и 57 удалений

Просмотреть файл

@ -243,7 +243,7 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
state = state or State()
queues = prepare_queues(queues)
consume_from = [_maybe_queue(app, q)
for q in consume_from or queues.keys()]
for q in consume_from or list(queues)]
if isinstance(tasks, string_t):
tasks = set(tasks.split(','))
if tasks is None:

Просмотреть файл

@ -39,17 +39,13 @@ else:
from urllib2 import Request, urlopen # noqa
def maybe_utf8(value): # noqa
"""Encode to utf-8, only if the value is Unicode."""
if isinstance(value, unicode):
return value.encode('utf-8')
return value
def utf8dict(tup): # noqa
"""With a dict's items() tuple return a new dict with any utf-8
keys/values encoded."""
return dict((key.encode('utf-8'), maybe_utf8(value))
for key, value in tup)
return dict(
(k.encode('utf-8'),
v.encode('utf-8') if isinstance(v, unicode) else v)
for k, v in tup)
class InvalidResponseError(Exception):

Просмотреть файл

@ -132,7 +132,7 @@ class test_start_filter(AppCase):
def test_start(self):
with patch('celery.contrib.migrate.eventloop') as evloop:
app = Mock()
filter = Mock(name='filter')
filt = Mock(name='filter')
conn = Connection('memory://')
evloop.side_effect = StopFiltering()
app.amqp.queues = {'foo': Queue('foo'), 'bar': Queue('bar')}
@ -147,26 +147,26 @@ class test_start_filter(AppCase):
consumer.callbacks.append(x)
consumer.register_callback = register_callback
start_filter(app, conn, filter,
start_filter(app, conn, filt,
queues='foo,bar', ack_messages=True)
body = {'task': 'add', 'id': 'id'}
for callback in consumer.callbacks:
callback(body, Message(body))
consumer.callbacks[:] = []
cb = Mock(name='callback=')
start_filter(app, conn, filter, tasks='add,mul', callback=cb)
start_filter(app, conn, filt, tasks='add,mul', callback=cb)
for callback in consumer.callbacks:
callback(body, Message(body))
self.assertTrue(cb.called)
on_declare_queue = Mock()
start_filter(app, conn, filter, tasks='add,mul', queues='foo',
start_filter(app, conn, filt, tasks='add,mul', queues='foo',
on_declare_queue=on_declare_queue)
self.assertTrue(on_declare_queue.called)
start_filter(app, conn, filter, queues=['foo', 'bar'])
start_filter(app, conn, filt, queues=['foo', 'bar'])
consumer.callbacks[:] = []
state = State()
start_filter(app, conn, filter,
start_filter(app, conn, filt,
tasks='add,mul', callback=cb, state=state, limit=1)
stop_filtering_raised = False
for callback in consumer.callbacks:
@ -182,14 +182,14 @@ class test_filter_callback(Case):
def test_filter(self):
callback = Mock()
filter = filter_callback(callback, ['add', 'mul'])
filt = filter_callback(callback, ['add', 'mul'])
t1 = {'task': 'add'}
t2 = {'task': 'div'}
message = Mock()
filter(t2, message)
filt(t2, message)
self.assertFalse(callback.called)
filter(t1, message)
filt(t1, message)
callback.assert_called_with(t1, message)

Просмотреть файл

@ -7,6 +7,11 @@ import re
import sys
import shelve
try:
input = input
except NameError:
input = raw_input # noqa
refre = re.compile(r'``([^`\s]+?)``')
ROLES = (
@ -76,7 +81,7 @@ def fixliterals(fname):
replace_type = None
while replace_type is None:
replace_type = raw_input(
replace_type = input(
colorize("Replace role: ", fg="yellow")).strip().lower()
if replace_type and replace_type not in ROLES:
replace_type = None
@ -94,7 +99,7 @@ def fixliterals(fname):
if default.endswith("()") and \
replace_type in ("class", "func", "meth"):
default = default[:-2]
replace_value = raw_input(
replace_value = input(
colorize("Text <target> [", fg="yellow") + default + \
colorize("]: ", fg="yellow")).strip()
if not replace_value:
@ -154,7 +159,7 @@ def colorize(text='', opts=(), **kwargs):
code_list = []
if text == '' and len(opts) == 1 and opts[0] == 'reset':
return '\x1b[%sm' % RESET
for k, v in kwargs.iteritems():
for k, v in kwargs.items():
if k == 'fg':
code_list.append(foreground[v])
elif k == 'bg':

Просмотреть файл

@ -66,8 +66,8 @@ source_suffix = '.rst'
master_doc = 'index'
# General information about the project.
project = u'Celery'
copyright = u'2009-2013, Ask Solem & Contributors'
project = 'Celery'
copyright = '2009-2013, Ask Solem & Contributors'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
@ -116,8 +116,8 @@ html_use_modindex = True
html_use_index = True
latex_documents = [
('index', 'Celery.tex', ur'Celery Documentation',
ur'Ask Solem & Contributors', 'manual'),
('index', 'Celery.tex', 'Celery Documentation',
'Ask Solem & Contributors', 'manual'),
]
html_theme = "celery"

Просмотреть файл

@ -12,6 +12,7 @@ module (when this is installed all name lookups will be asynchronous)::
$ pip install eventlet
$ pip install dnspython
$ pip install requests
Before you run any of the example tasks you need to start
the worker::

Просмотреть файл

@ -1,12 +1,13 @@
import requests
from celery import task
from eventlet.green import urllib2
@task()
def urlopen(url):
print('Opening: {0}'.format(url))
try:
body = urllib2.urlopen(url).read()
response = requests.get(url)
except Exception as exc:
print('URL {0} gave error: {1!r}'.format(url, exc))
return len(body)
return len(response.text)

Просмотреть файл

@ -24,11 +24,16 @@ to "zlib", and the serializer to "pickle".
import re
import time
import urlparse
try:
from urllib.parse import urlsplit
except ImportError:
from urlparse import urlsplit
import requests
from celery import task, group
from eventlet import Timeout
from eventlet.green import urllib2
from pybloom import BloomFilter
@ -39,7 +44,7 @@ url_regex = re.compile(
def domain(url):
"""Returns the domain part of an URL."""
return urlparse.urlsplit(url)[1].split(':')[0]
return urlsplit(url)[1].split(':')[0]
@task(ignore_result=True, serializer='pickle', compression='zlib')
@ -50,13 +55,13 @@ def crawl(url, seen=None):
with Timeout(5, False):
try:
data = urllib2.urlopen(url).read()
except (urllib2.HTTPError, IOError):
response = requests.get(url)
except Exception:
return
location = domain(url)
wanted_urls = []
for url_match in url_regex.finditer(data):
for url_match in url_regex.finditer(response.text):
url = url_match.group(0)
# To not destroy the internet, we only fetch URLs on the same domain.
if url not in seen and location in domain(url):
@ -64,4 +69,4 @@ def crawl(url, seen=None):
seen.add(url)
subtasks = group(crawl.s(url, seen) for url in wanted_urls)
subtasks.apply_async()
subtasks()

Просмотреть файл

@ -1,4 +1,4 @@
import urllib2
import requests
from celery import task
@ -7,7 +7,7 @@ from celery import task
def urlopen(url):
print('Opening: {0}'.format(url))
try:
body = urllib2.urlopen(url).read()
_response = requests.get(url)
except Exception as exc:
print('Exception for {0}: {1!r}'.format(url, exc))
return url, 0

Просмотреть файл

@ -1,7 +1,7 @@
#!/usr/bin/env python
from django.core.management import execute_manager
try:
import settings # Assumed to be in the same directory.
from . import settings # Assumed to be in the same directory.
except ImportError:
import sys
sys.stderr.write(

Просмотреть файл

@ -1,5 +1,5 @@
from django.conf.urls.defaults import *
import views
from . import views
# Uncomment the next two lines to enable the admin:
# from django.contrib import admin

Просмотреть файл

@ -21,7 +21,7 @@ def proper_name(name):
def find_missing_authors(seen):
with open("AUTHORS") as authors:
known = map(author, authors.readlines())
known = [author(line) for line in authors.readlines()]
seen_authors = set(filter(proper_name, (t[0] for t in seen)))
seen_emails = set(t[1] for t in seen)
@ -32,5 +32,5 @@ def find_missing_authors(seen):
if __name__ == "__main__":
find_missing_authors(map(author, fileinput.input()))
find_missing_authors([author(line) for line in fileinput.input()])

Просмотреть файл

@ -14,6 +14,8 @@ from tempfile import NamedTemporaryFile
rq = lambda s: s.strip("\"'")
str_t = str if sys.version_info[0] >= 3 else basestring
def cmd(*args):
return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0]
@ -23,7 +25,7 @@ def cmd(*args):
def no_enoent():
try:
yield
except OSError, exc:
except OSError as exc:
if exc.errno != errno.ENOENT:
raise
@ -56,7 +58,7 @@ class TupleVersion(object):
v = list(v)
def quote(lit):
if isinstance(lit, basestring):
if isinstance(lit, str_t):
return '"{0}"'.format(lit)
return str(lit)

Просмотреть файл

@ -3,6 +3,8 @@ import os
import re
import sys
from collections import Callable
dirname = ""
RE_CODE_BLOCK = re.compile(r'.. code-block:: (.+?)\s*$')
@ -51,7 +53,7 @@ def _process(lines):
lines = list(lines) # non-destructive
for i, line in enumerate(lines):
for regex, alt in TO_RST_MAP.items():
if callable(alt):
if isinstance(alt, Callable):
match = regex.match(line)
if match:
alt(lines, i, match)

Просмотреть файл

@ -1,6 +1,6 @@
from __future__ import print_function
from fileinput import input
from fileinput import input as _input
from sys import exit, stderr
from celery.app.defaults import NAMESPACES, flatten
@ -28,7 +28,7 @@ def find_undocumented_settings(directive='.. setting:: '):
settings = dict(flatten(NAMESPACES))
all = set(settings)
documented = set(line.strip()[len(directive):].strip()
for line in input()
for line in _input()
if line.strip().startswith(directive))
return [setting for setting in all ^ documented
if not is_ignored(setting, settings[setting])]

Просмотреть файл

@ -1,11 +1,10 @@
from celery import current_app, task, uuid
from celery.five import range
from celery.five import Queue, range
from celery.worker.consumer import Consumer
from celery.worker.job import Request
from celery.concurrency.solo import TaskPool
from celery.app.amqp import TASK_BARE
from time import time
from Queue import Queue
from librabbitmq import Message
import socket
import sys

Просмотреть файл

@ -1,12 +1,11 @@
from celery import current_app, task, uuid
from celery.five import range
from celery.five import Queue, range
from celery.worker.consumer import Consumer
#from celery.worker.job import Request
from celery.app.task import Context
from celery.concurrency.solo import TaskPool
from celery.app.amqp import TASK_BARE
from time import time
from Queue import Queue
from librabbitmq import Message
from celery.utils.functional import noop
from celery.worker.job import NEEDS_KWDICT

Просмотреть файл

@ -4,6 +4,7 @@ from __future__ import absolute_import
import sys
from time import sleep
from celery.five import range
from celery.utils import timer2 as timer
def noop(*args, **kwargs):
@ -11,7 +12,7 @@ def noop(*args, **kwargs):
def insert(s, n=100000):
for i in xrange(n):
for i in range(n):
s.apply_after(1 + (i and i / 10.0), noop, (i, ))

Просмотреть файл

@ -1,11 +1,10 @@
from celery import current_app, task, uuid
from celery.five import range
from celery.five import Queue, range
from celery.worker.consumer import Consumer
from celery.worker.job import Request
from celery.concurrency.solo import TaskPool
from celery.app.amqp import TASK_BARE
from time import time
from Queue import Queue
from librabbitmq import Message
import socket
import sys

Просмотреть файл

@ -117,7 +117,7 @@ class Suite(object):
)
def manyshort(self):
self.join(group(add.s(i, i) for i in xrange(1000))(), propagate=True)
self.join(group(add.s(i, i) for i in range(1000))(), propagate=True)
def runtest(self, fun, n=50, index=0, repeats=1):
with blockdetection(self.block_timeout):

Просмотреть файл

@ -145,10 +145,12 @@ py_version = sys.version_info
def strip_comments(l):
return l.split('#', 1)[0].strip()
def reqs(*f):
return list(filter(None, [strip_comments(l) for l in open(
os.path.join(os.getcwd(), 'requirements', *f)).readlines()]))
return [
r for r in (
strip_comments(l) for l in open(
os.path.join(os.getcwd(), 'requirements', *f)).readlines()
) if r]
install_requires = reqs('default.txt')
if JYTHON: